[jira] [Resolved] (FLINK-32025) Make job cancellation button on UI configurable
[ https://issues.apache.org/jira/browse/FLINK-32025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu resolved FLINK-32025. Resolution: Duplicate > Make job cancellation button on UI configurable > --- > > Key: FLINK-32025 > URL: https://issues.apache.org/jira/browse/FLINK-32025 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu >Priority: Major > > On the flink job UI, there is `Cancel Job` button. > When the job UI is shown to users, it is desirable to hide the button so that > normal user doesn't mistakenly cancel a long running flink job. > This issue adds configuration for hiding the `Cancel Job` button. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32025) Make job cancellation button on UI configurable
Ted Yu created FLINK-32025: -- Summary: Make job cancellation button on UI configurable Key: FLINK-32025 URL: https://issues.apache.org/jira/browse/FLINK-32025 Project: Flink Issue Type: Improvement Reporter: Ted Yu On the flink job UI, there is `Cancel Job` button. When the job UI is shown to users, it is desirable to hide the button so that normal user doesn't mistakenly cancel a long running flink job. This issue adds configuration for hiding the `Cancel Job` button. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32025) Make job cancellation button on UI configurable
Ted Yu created FLINK-32025: -- Summary: Make job cancellation button on UI configurable Key: FLINK-32025 URL: https://issues.apache.org/jira/browse/FLINK-32025 Project: Flink Issue Type: Improvement Reporter: Ted Yu On the flink job UI, there is `Cancel Job` button. When the job UI is shown to users, it is desirable to hide the button so that normal user doesn't mistakenly cancel a long running flink job. This issue adds configuration for hiding the `Cancel Job` button. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (SPARK-42090) Introduce sasl retry count in RetryingBlockTransferor
[ https://issues.apache.org/jira/browse/SPARK-42090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated SPARK-42090: --- Description: Previously a boolean variable, saslTimeoutSeen, was used in RetryingBlockTransferor. However, the boolean variable wouldn't cover the following scenario: 1. SaslTimeoutException 2. IOException 3. SaslTimeoutException 4. IOException Even though IOException at #2 is retried (resulting in increment of retryCount), the retryCount would be cleared at step #4. Since the intention of saslTimeoutSeen is to undo the increment due to retrying SaslTimeoutException, we should keep a counter for SaslTimeoutException retries and subtract the value of this counter from retryCount. was: Previously a boolean variable, saslTimeoutSeen, was used. However, the boolean variable wouldn't cover the following scenario: 1. SaslTimeoutException 2. IOException 3. SaslTimeoutException 4. IOException Even though IOException at #2 is retried (resulting in increment of retryCount), the retryCount would be cleared at step #4. Since the intention of saslTimeoutSeen is to undo the increment due to retrying SaslTimeoutException, we should keep a counter for SaslTimeoutException retries and subtract the value of this counter from retryCount. > Introduce sasl retry count in RetryingBlockTransferor > - > > Key: SPARK-42090 > URL: https://issues.apache.org/jira/browse/SPARK-42090 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.4.0 >Reporter: Ted Yu >Priority: Major > > Previously a boolean variable, saslTimeoutSeen, was used in > RetryingBlockTransferor. However, the boolean variable wouldn't cover the > following scenario: > 1. SaslTimeoutException > 2. IOException > 3. SaslTimeoutException > 4. IOException > Even though IOException at #2 is retried (resulting in increment of > retryCount), the retryCount would be cleared at step #4. > Since the intention of saslTimeoutSeen is to undo the increment due to > retrying SaslTimeoutException, we should keep a counter for > SaslTimeoutException retries and subtract the value of this counter from > retryCount. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-42090) Introduce sasl retry count in RetryingBlockTransferor
Ted Yu created SPARK-42090: -- Summary: Introduce sasl retry count in RetryingBlockTransferor Key: SPARK-42090 URL: https://issues.apache.org/jira/browse/SPARK-42090 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.4.0 Reporter: Ted Yu Previously a boolean variable, saslTimeoutSeen, was used. However, the boolean variable wouldn't cover the following scenario: 1. SaslTimeoutException 2. IOException 3. SaslTimeoutException 4. IOException Even though IOException at #2 is retried (resulting in increment of retryCount), the retryCount would be cleared at step #4. Since the intention of saslTimeoutSeen is to undo the increment due to retrying SaslTimeoutException, we should keep a counter for SaslTimeoutException retries and subtract the value of this counter from retryCount. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-41853) Use Map in place of SortedMap for ErrorClassesJsonReader
Ted Yu created SPARK-41853: -- Summary: Use Map in place of SortedMap for ErrorClassesJsonReader Key: SPARK-41853 URL: https://issues.apache.org/jira/browse/SPARK-41853 Project: Spark Issue Type: Task Components: Spark Core Affects Versions: 3.2.3 Reporter: Ted Yu The use of SortedMap in ErrorClassesJsonReader was mostly for making tests easier to write. This PR replaces SortedMap with Map since SortedMap is slower compared to Map. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-41705) Move generate_protos.sh to dev/
Ted Yu created SPARK-41705: -- Summary: Move generate_protos.sh to dev/ Key: SPARK-41705 URL: https://issues.apache.org/jira/browse/SPARK-41705 Project: Spark Issue Type: Task Components: Connect Affects Versions: 3.4.0 Reporter: Ted Yu connector/connect/dev only contains one script. Moving generate_protos.sh to dev follows practice for other scripts. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-41419) [K8S] Decrement PVC_COUNTER when the pod deletion happens
Ted Yu created SPARK-41419: -- Summary: [K8S] Decrement PVC_COUNTER when the pod deletion happens Key: SPARK-41419 URL: https://issues.apache.org/jira/browse/SPARK-41419 Project: Spark Issue Type: Task Components: Kubernetes Affects Versions: 3.4.0 Reporter: Ted Yu commit cc55de3 introduced PVC_COUNTER to track outstanding number of PVCs. PVC_COUNTER should only be decremented when the pod deletion happens (in response to error). If the PVC isn't created successfully (where PVC_COUNTER isn't incremented) (possibly due to execution not reaching resource(pvc).create() call), we shouldn't decrement the counter. variable `success` tracks the progress of PVC creation: value 0 means PVC is not created. value 1 means PVC has been created. value 2 means PVC has been created but due to subsequent error, the pod is deleted. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-41274) Bump Kubernetes Client Version to 6.2.0
[ https://issues.apache.org/jira/browse/SPARK-41274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu resolved SPARK-41274. Resolution: Duplicate This is dup of commit 02a2242a45062755bf7e20805958d5bdf1f5ed74 > Bump Kubernetes Client Version to 6.2.0 > --- > > Key: SPARK-41274 > URL: https://issues.apache.org/jira/browse/SPARK-41274 > Project: Spark > Issue Type: Improvement > Components: Kubernetes >Affects Versions: 3.4.0 >Reporter: Ted Yu >Priority: Major > > Bump Kubernetes Client Version to 6.2.0 -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-41274) Bump Kubernetes Client Version to 6.2.0
Ted Yu created SPARK-41274: -- Summary: Bump Kubernetes Client Version to 6.2.0 Key: SPARK-41274 URL: https://issues.apache.org/jira/browse/SPARK-41274 Project: Spark Issue Type: Improvement Components: Kubernetes Affects Versions: 3.4.0 Reporter: Ted Yu Bump Kubernetes Client Version to 6.2.0 -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-41197) Upgrade Kafka version to 3.3 release
Ted Yu created SPARK-41197: -- Summary: Upgrade Kafka version to 3.3 release Key: SPARK-41197 URL: https://issues.apache.org/jira/browse/SPARK-41197 Project: Spark Issue Type: Improvement Components: Java API Affects Versions: 3.3.1 Reporter: Ted Yu Kafka 3.3 has been released. This issue upgrades Kafka dependency to 3.3 release. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-40508) Treat unknown partitioning as UnknownPartitioning
[ https://issues.apache.org/jira/browse/SPARK-40508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated SPARK-40508: --- Description: When running spark application against spark 3.3, I see the following : {code} java.lang.IllegalArgumentException: Unsupported data source V2 partitioning type: CustomPartitioning at org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioning$$anonfun$apply$1.applyOrElse(V2ScanPartitioning.scala:46) at org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioning$$anonfun$apply$1.applyOrElse(V2ScanPartitioning.scala:34) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584) {code} The CustomPartitioning works fine with Spark 3.2.1 This PR proposes to relax the code and treat all unknown partitioning the same way as that for UnknownPartitioning. was: When running spark application against spark 3.3, I see the following : ``` java.lang.IllegalArgumentException: Unsupported data source V2 partitioning type: CustomPartitioning at org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioning$$anonfun$apply$1.applyOrElse(V2ScanPartitioning.scala:46) at org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioning$$anonfun$apply$1.applyOrElse(V2ScanPartitioning.scala:34) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584) ``` The CustomPartitioning works fine with Spark 3.2.1 This PR proposes to relax the code and treat all unknown partitioning the same way as that for UnknownPartitioning. > Treat unknown partitioning as UnknownPartitioning > - > > Key: SPARK-40508 > URL: https://issues.apache.org/jira/browse/SPARK-40508 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.3.0 >Reporter: Ted Yu >Priority: Major > > When running spark application against spark 3.3, I see the following : > {code} > java.lang.IllegalArgumentException: Unsupported data source V2 partitioning > type: CustomPartitioning > at > org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioning$$anonfun$apply$1.applyOrElse(V2ScanPartitioning.scala:46) > at > org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioning$$anonfun$apply$1.applyOrElse(V2ScanPartitioning.scala:34) > at > org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584) > {code} > The CustomPartitioning works fine with Spark 3.2.1 > This PR proposes to relax the code and treat all unknown partitioning the > same way as that for UnknownPartitioning. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-40508) Treat unknown partitioning as UnknownPartitioning
Ted Yu created SPARK-40508: -- Summary: Treat unknown partitioning as UnknownPartitioning Key: SPARK-40508 URL: https://issues.apache.org/jira/browse/SPARK-40508 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.3.0 Reporter: Ted Yu When running spark application against spark 3.3, I see the following : ``` java.lang.IllegalArgumentException: Unsupported data source V2 partitioning type: CustomPartitioning at org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioning$$anonfun$apply$1.applyOrElse(V2ScanPartitioning.scala:46) at org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioning$$anonfun$apply$1.applyOrElse(V2ScanPartitioning.scala:34) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584) ``` The CustomPartitioning works fine with Spark 3.2.1 This PR proposes to relax the code and treat all unknown partitioning the same way as that for UnknownPartitioning. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-39769) Rename trait Unevaluable
Ted Yu created SPARK-39769: -- Summary: Rename trait Unevaluable Key: SPARK-39769 URL: https://issues.apache.org/jira/browse/SPARK-39769 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.3.0 Reporter: Ted Yu I came upon `trait Unevaluable` which is defined in sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala Unevaluable is not a word. There are `valuable`, `invaluable` but I have never seen Unevaluable. This issue renames the trait to Unevaluatable -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-38998) Call to MetricsSystem#getServletHandlers() may take place before MetricsSystem becomes running
[ https://issues.apache.org/jira/browse/SPARK-38998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated SPARK-38998: --- Description: Sometimes the following exception is observed: {code} java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystem at scala.Predef$.require(Predef.scala:281) at org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:92) at org.apache.spark.SparkContext.(SparkContext.scala:597) at org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:58) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) {code} It seems somehow the MetricsSystem was stopped in between start() and getServletHandlers() calls. SparkContext should check the MetricsSystem becomes running before calling getServletHandlers() was: Sometimes the following exception is observed: {code} java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystem at scala.Predef$.require(Predef.scala:281) at org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:92) at org.apache.spark.SparkContext.(SparkContext.scala:597) at org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:58) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) {code} It seems SparkContext should wait till the MetricsSystem becomes running before calling getServletHandlers() > Call to MetricsSystem#getServletHandlers() may take place before > MetricsSystem becomes running > -- > > Key: SPARK-38998 > URL: https://issues.apache.org/jira/browse/SPARK-38998 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.1.2 >Reporter: Ted Yu >Priority: Major > > Sometimes the following exception is observed: > {code} > java.lang.IllegalArgumentException: requirement failed: Can only call > getServletHandlers on a running MetricsSystem > at scala.Predef$.require(Predef.scala:281) > at > org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:92) > at org.apache.spark.SparkContext.(SparkContext.scala:597) > at > org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:58) > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > {code} > It seems somehow the MetricsSystem was stopped in between start() and > getServletHandlers() calls. > SparkContext should check the MetricsSystem becomes running before calling > getServletHandlers() -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-38998) Call to MetricsSystem#getServletHandlers() may take place before MetricsSystem becomes running
[ https://issues.apache.org/jira/browse/SPARK-38998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated SPARK-38998: --- Description: Sometimes the following exception is observed: {code} java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystem at scala.Predef$.require(Predef.scala:281) at org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:92) at org.apache.spark.SparkContext.(SparkContext.scala:597) at org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:58) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) {code} It seems SparkContext should wait till the MetricsSystem becomes running before calling getServletHandlers() was: Sometimes the following exception is observed: ``` java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystem at scala.Predef$.require(Predef.scala:281) at org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:92) at org.apache.spark.SparkContext.(SparkContext.scala:597) at org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:58) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ``` It seems SparkContext should wait till the MetricsSystem becomes running before calling getServletHandlers() > Call to MetricsSystem#getServletHandlers() may take place before > MetricsSystem becomes running > -- > > Key: SPARK-38998 > URL: https://issues.apache.org/jira/browse/SPARK-38998 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.1.2 >Reporter: Ted Yu >Priority: Major > > Sometimes the following exception is observed: > {code} > java.lang.IllegalArgumentException: requirement failed: Can only call > getServletHandlers on a running MetricsSystem > at scala.Predef$.require(Predef.scala:281) > at > org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:92) > at org.apache.spark.SparkContext.(SparkContext.scala:597) > at > org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:58) > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > {code} > It seems SparkContext should wait till the MetricsSystem becomes running > before calling getServletHandlers() -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-38998) Call to MetricsSystem#getServletHandlers() may take place before MetricsSystem becomes running
Ted Yu created SPARK-38998: -- Summary: Call to MetricsSystem#getServletHandlers() may take place before MetricsSystem becomes running Key: SPARK-38998 URL: https://issues.apache.org/jira/browse/SPARK-38998 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.1.2 Reporter: Ted Yu Sometimes the following exception is observed: ``` java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystem at scala.Predef$.require(Predef.scala:281) at org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:92) at org.apache.spark.SparkContext.(SparkContext.scala:597) at org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:58) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ``` It seems SparkContext should wait till the MetricsSystem becomes running before calling getServletHandlers() -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-34532) IntervalUtils.add() may result in 'long overflow'
[ https://issues.apache.org/jira/browse/SPARK-34532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17290646#comment-17290646 ] Ted Yu commented on SPARK-34532: Included the test command and some more information in the description. You should see these errors when you run the command. > IntervalUtils.add() may result in 'long overflow' > - > > Key: SPARK-34532 > URL: https://issues.apache.org/jira/browse/SPARK-34532 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.2 >Reporter: Ted Yu >Priority: Major > > I noticed the following when running test suite: > build/sbt "sql/testOnly *SQLQueryTestSuite" > {code} > 19:10:17.977 ERROR org.apache.spark.scheduler.TaskSetManager: Task 1 in stage > 6416.0 failed 1 times; aborting job > [info] - postgreSQL/int4.sql (2 seconds, 543 milliseconds) > 19:10:20.994 ERROR org.apache.spark.executor.Executor: Exception in task 1.0 > in stage 6476.0 (TID 7789) > java.lang.ArithmeticException: long overflow > at java.lang.Math.multiplyExact(Math.java:892) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345) > at > org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) > at > org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > {code} > {code} > 19:15:38.255 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 > in stage 14744.0 (TID 16705) > java.lang.ArithmeticException: long overflow > at java.lang.Math.addExact(Math.java:809) > at org.apache.spark.sql.types.LongExactNumeric$.plus(numerics.scala:105) > at org.apache.spark.sql.types.LongExactNumeric$.plus(numerics.scala:104) > at > org.apache.spark.sql.catalyst.expressions.Add.nullSafeEval(arithmetic.scala:268) > at > org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:573) > at > org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(InterpretedMutableProjection.scala:97) > {code} > This likely was caused by the following line: > {code} > val microseconds = left.microseconds + right.microseconds > {code} > We should check whether the addition would produce overflow before adding. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-34532) IntervalUtils.add() may result in 'long overflow'
[ https://issues.apache.org/jira/browse/SPARK-34532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated SPARK-34532: --- Description: I noticed the following when running test suite: build/sbt "sql/testOnly *SQLQueryTestSuite" {code} 19:10:17.977 ERROR org.apache.spark.scheduler.TaskSetManager: Task 1 in stage 6416.0 failed 1 times; aborting job [info] - postgreSQL/int4.sql (2 seconds, 543 milliseconds) 19:10:20.994 ERROR org.apache.spark.executor.Executor: Exception in task 1.0 in stage 6476.0 (TID 7789) java.lang.ArithmeticException: long overflow at java.lang.Math.multiplyExact(Math.java:892) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) {code} {code} 19:15:38.255 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 14744.0 (TID 16705) java.lang.ArithmeticException: long overflow at java.lang.Math.addExact(Math.java:809) at org.apache.spark.sql.types.LongExactNumeric$.plus(numerics.scala:105) at org.apache.spark.sql.types.LongExactNumeric$.plus(numerics.scala:104) at org.apache.spark.sql.catalyst.expressions.Add.nullSafeEval(arithmetic.scala:268) at org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:573) at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(InterpretedMutableProjection.scala:97) {code} This likely was caused by the following line: {code} val microseconds = left.microseconds + right.microseconds {code} We should check whether the addition would produce overflow before adding. was: I noticed the following when running test suite: {code} 19:15:38.255 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 14744.0 (TID 16705) java.lang.ArithmeticException: long overflow at java.lang.Math.addExact(Math.java:809) at org.apache.spark.sql.types.LongExactNumeric$.plus(numerics.scala:105) at org.apache.spark.sql.types.LongExactNumeric$.plus(numerics.scala:104) at org.apache.spark.sql.catalyst.expressions.Add.nullSafeEval(arithmetic.scala:268) at org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:573) at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(InterpretedMutableProjection.scala:97) {code} This likely was caused by the following line: {code} val microseconds = left.microseconds + right.microseconds {code} We should check whether the addition would produce overflow before adding. > IntervalUtils.add() may result in 'long overflow' > - > > Key: SPARK-34532 > URL: https://issues.apache.org/jira/browse/SPARK-34532 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.2 >Reporter: Ted Yu >Priority: Major > > I noticed the following when running test suite: > build/sbt "sql/testOnly *SQLQueryTestSuite" > {code} > 19:10:17.977 ERROR org.apache.spark.scheduler.TaskSetManager: Task 1 in stage > 6416.0 failed 1 times; aborting job > [info] - postgreSQL/int4.sql (2 seconds, 543 milliseconds) > 19:10:20.994 ERROR org.apache.spark.executor.Executor: Exception in task 1.0 > in stage 6476.0 (TID 7789) > java.lang.ArithmeticException: long overflow > at java.lang.Math.multiplyExact(Math.java:892) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345) > at >
[jira] [Created] (SPARK-34532) IntervalUtils.add() may result in 'long overflow'
Ted Yu created SPARK-34532: -- Summary: IntervalUtils.add() may result in 'long overflow' Key: SPARK-34532 URL: https://issues.apache.org/jira/browse/SPARK-34532 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.0.2 Reporter: Ted Yu I noticed the following when running test suite: {code} 19:15:38.255 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 14744.0 (TID 16705) java.lang.ArithmeticException: long overflow at java.lang.Math.addExact(Math.java:809) at org.apache.spark.sql.types.LongExactNumeric$.plus(numerics.scala:105) at org.apache.spark.sql.types.LongExactNumeric$.plus(numerics.scala:104) at org.apache.spark.sql.catalyst.expressions.Add.nullSafeEval(arithmetic.scala:268) at org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:573) at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(InterpretedMutableProjection.scala:97) {code} This likely was caused by the following line: {code} val microseconds = left.microseconds + right.microseconds {code} We should check whether the addition would produce overflow before adding. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-34476) Duplicate referenceNames are given for ambiguousReferences
[ https://issues.apache.org/jira/browse/SPARK-34476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17287403#comment-17287403 ] Ted Yu commented on SPARK-34476: The basic jsonb test is here: https://github.com/yugabyte/yugabyte-db/blob/master/java/yb-cql-4x/src/test/java/org/yb/loadtest/TestSpark3Jsonb.java I am working on adding get_json_string() function (via Spark extension) which is similar to get_json_object() but expands the last jsonb field using '->>' instead of '->'. > Duplicate referenceNames are given for ambiguousReferences > -- > > Key: SPARK-34476 > URL: https://issues.apache.org/jira/browse/SPARK-34476 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Ted Yu >Priority: Major > > When running test with Spark extension that converts custom function to json > path expression, I saw the following in test output: > {code} > 2021-02-19 21:57:24,550 (Time-limited test) [INFO - > org.yb.loadtest.TestSpark3Jsonb.testJsonb(TestSpark3Jsonb.java:102)] plan is > == Physical Plan == > org.apache.spark.sql.AnalysisException: Reference > 'phone->'key'->1->'m'->2->>'b'' is ambiguous, could be: > mycatalog.test.person.phone->'key'->1->'m'->2->>'b', > mycatalog.test.person.phone->'key'->1->'m'->2->>'b'.; line 1 pos 8 > {code} > Please note the candidates following 'could be' are the same. > Here is the physical plan for a working query where phone is a jsonb column: > {code} > TakeOrderedAndProject(limit=2, orderBy=[id#6 ASC NULLS FIRST], > output=[id#6,address#7,key#0]) > +- *(1) Project [id#6, address#7, phone->'key'->1->'m'->2->'b'#12 AS key#0] >+- BatchScan[id#6, address#7, phone->'key'->1->'m'->2->'b'#12] Cassandra > Scan: test.person > - Cassandra Filters: [[phone->'key'->1->'m'->2->>'b' >= ?, 100]] > - Requested Columns: [id,address,phone->'key'->1->'m'->2->'b'] > {code} > The difference for the failed query is that it tries to use > {code}phone->'key'->1->'m'->2->>'b'{code} in the projection (which works as > part of filter). -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-34476) Duplicate referenceNames are given for ambiguousReferences
[ https://issues.apache.org/jira/browse/SPARK-34476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated SPARK-34476: --- Description: When running test with Spark extension that converts custom function to json path expression, I saw the following in test output: {code} 2021-02-19 21:57:24,550 (Time-limited test) [INFO - org.yb.loadtest.TestSpark3Jsonb.testJsonb(TestSpark3Jsonb.java:102)] plan is == Physical Plan == org.apache.spark.sql.AnalysisException: Reference 'phone->'key'->1->'m'->2->>'b'' is ambiguous, could be: mycatalog.test.person.phone->'key'->1->'m'->2->>'b', mycatalog.test.person.phone->'key'->1->'m'->2->>'b'.; line 1 pos 8 {code} Please note the candidates following 'could be' are the same. Here is the physical plan for a working query where phone is a jsonb column: {code} TakeOrderedAndProject(limit=2, orderBy=[id#6 ASC NULLS FIRST], output=[id#6,address#7,key#0]) +- *(1) Project [id#6, address#7, phone->'key'->1->'m'->2->'b'#12 AS key#0] +- BatchScan[id#6, address#7, phone->'key'->1->'m'->2->'b'#12] Cassandra Scan: test.person - Cassandra Filters: [[phone->'key'->1->'m'->2->>'b' >= ?, 100]] - Requested Columns: [id,address,phone->'key'->1->'m'->2->'b'] {code} The difference for the failed query is that it tries to use {code}phone->'key'->1->'m'->2->>'b'{code} in the projection (which works as part of filter). was: When running test with Spark extension that converts custom function to json path expression, I saw the following in test output: {code} 2021-02-19 21:57:24,550 (Time-limited test) [INFO - org.yb.loadtest.TestSpark3Jsonb.testJsonb(TestSpark3Jsonb.java:102)] plan is == Physical Plan == org.apache.spark.sql.AnalysisException: Reference 'phone->'key'->1->'m'->2->>'b'' is ambiguous, could be: mycatalog.test.person.phone->'key'->1->'m'->2->>'b', mycatalog.test.person.phone->'key'->1->'m'->2->>'b'.; line 1 pos 8 {code} Please note the candidates following 'could be' are the same. Here is the physical plan for a working query where phone is a jsonb column: {code} TakeOrderedAndProject(limit=2, orderBy=[id#6 ASC NULLS FIRST], output=[id#6,address#7,key#0]) +- *(1) Project [id#6, address#7, phone->'key'->1->'m'->2->'b'#12 AS key#0] +- BatchScan[id#6, address#7, phone->'key'->1->'m'->2->'b'#12] Cassandra Scan: test.person - Cassandra Filters: [[phone->'key'->1->'m'->2->>'b' >= ?, 100]] - Requested Columns: [id,address,phone->'key'->1->'m'->2->'b'] {code} The difference for the failed query is that it tries to use phone->'key'->1->'m'->2->>'b' in the projection (which works as part of filter). > Duplicate referenceNames are given for ambiguousReferences > -- > > Key: SPARK-34476 > URL: https://issues.apache.org/jira/browse/SPARK-34476 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Ted Yu >Priority: Major > > When running test with Spark extension that converts custom function to json > path expression, I saw the following in test output: > {code} > 2021-02-19 21:57:24,550 (Time-limited test) [INFO - > org.yb.loadtest.TestSpark3Jsonb.testJsonb(TestSpark3Jsonb.java:102)] plan is > == Physical Plan == > org.apache.spark.sql.AnalysisException: Reference > 'phone->'key'->1->'m'->2->>'b'' is ambiguous, could be: > mycatalog.test.person.phone->'key'->1->'m'->2->>'b', > mycatalog.test.person.phone->'key'->1->'m'->2->>'b'.; line 1 pos 8 > {code} > Please note the candidates following 'could be' are the same. > Here is the physical plan for a working query where phone is a jsonb column: > {code} > TakeOrderedAndProject(limit=2, orderBy=[id#6 ASC NULLS FIRST], > output=[id#6,address#7,key#0]) > +- *(1) Project [id#6, address#7, phone->'key'->1->'m'->2->'b'#12 AS key#0] >+- BatchScan[id#6, address#7, phone->'key'->1->'m'->2->'b'#12] Cassandra > Scan: test.person > - Cassandra Filters: [[phone->'key'->1->'m'->2->>'b' >= ?, 100]] > - Requested Columns: [id,address,phone->'key'->1->'m'->2->'b'] > {code} > The difference for the failed query is that it tries to use > {code}phone->'key'->1->'m'->2->>'b'{code} in the projection (which works as > part of filter). -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-34476) Duplicate referenceNames are given for ambiguousReferences
Ted Yu created SPARK-34476: -- Summary: Duplicate referenceNames are given for ambiguousReferences Key: SPARK-34476 URL: https://issues.apache.org/jira/browse/SPARK-34476 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.0.0 Reporter: Ted Yu When running test with Spark extension that converts custom function to json path expression, I saw the following in test output: {code} 2021-02-19 21:57:24,550 (Time-limited test) [INFO - org.yb.loadtest.TestSpark3Jsonb.testJsonb(TestSpark3Jsonb.java:102)] plan is == Physical Plan == org.apache.spark.sql.AnalysisException: Reference 'phone->'key'->1->'m'->2->>'b'' is ambiguous, could be: mycatalog.test.person.phone->'key'->1->'m'->2->>'b', mycatalog.test.person.phone->'key'->1->'m'->2->>'b'.; line 1 pos 8 {code} Please note the candidates following 'could be' are the same. Here is the physical plan for a working query where phone is a jsonb column: {code} TakeOrderedAndProject(limit=2, orderBy=[id#6 ASC NULLS FIRST], output=[id#6,address#7,key#0]) +- *(1) Project [id#6, address#7, phone->'key'->1->'m'->2->'b'#12 AS key#0] +- BatchScan[id#6, address#7, phone->'key'->1->'m'->2->'b'#12] Cassandra Scan: test.person - Cassandra Filters: [[phone->'key'->1->'m'->2->>'b' >= ?, 100]] - Requested Columns: [id,address,phone->'key'->1->'m'->2->'b'] {code} The difference for the failed query is that it tries to use phone->'key'->1->'m'->2->>'b' in the projection (which works as part of filter). -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-34017) Pass json column information via pruneColumns()
[ https://issues.apache.org/jira/browse/SPARK-34017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17259423#comment-17259423 ] Ted Yu commented on SPARK-34017: For PushDownUtils#pruneColumns, I am experimenting with the following: {code} case r: SupportsPushDownRequiredColumns if SQLConf.get.nestedSchemaPruningEnabled => val JSONCapture = "get_json_object\\((.*), *(.*)\\)".r var jsonRootFields : ArrayBuffer[RootField] = ArrayBuffer() projects.map{ _.map{ f => f.toString match { case JSONCapture(column, field) => jsonRootFields += RootField(StructField(column, f.dataType, f.nullable), derivedFromAtt = false, prunedIfAnyChildAccessed = true) case _ => logDebug("else " + f) }}} val rootFields = SchemaPruning.identifyRootFields(projects, filters) ++ jsonRootFields {code} > Pass json column information via pruneColumns() > --- > > Key: SPARK-34017 > URL: https://issues.apache.org/jira/browse/SPARK-34017 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.1 >Reporter: Ted Yu >Priority: Major > > Currently PushDownUtils#pruneColumns only passes root fields to > SupportsPushDownRequiredColumns implementation(s). > {code} > 2021-01-05 19:36:07,437 (Time-limited test) [DEBUG - > org.apache.spark.internal.Logging.logDebug(Logging.scala:61)] nested schema > projection List(id#33, address#34, phone#36, get_json_object(phone#36, > $.code) AS get_json_object(phone, $.code)#37) > 2021-01-05 19:36:07,438 (Time-limited test) [DEBUG - > org.apache.spark.internal.Logging.logDebug(Logging.scala:61)] nested schema > StructType(StructField(id,IntegerType,false), > StructField(address,StringType,true), StructField(phone,StringType,true)) > {code} > The first line shows projections and the second line shows the pruned schema. > We can see that get_json_object(phone#36, $.code) is filtered. This > expression retrieves field 'code' from phone json column. > We should allow json column information to be passed via pruneColumns(). -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-34017) Pass json column information via pruneColumns()
[ https://issues.apache.org/jira/browse/SPARK-34017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated SPARK-34017: --- Description: Currently PushDownUtils#pruneColumns only passes root fields to SupportsPushDownRequiredColumns implementation(s). {code} 2021-01-05 19:36:07,437 (Time-limited test) [DEBUG - org.apache.spark.internal.Logging.logDebug(Logging.scala:61)] nested schema projection List(id#33, address#34, phone#36, get_json_object(phone#36, $.code) AS get_json_object(phone, $.code)#37) 2021-01-05 19:36:07,438 (Time-limited test) [DEBUG - org.apache.spark.internal.Logging.logDebug(Logging.scala:61)] nested schema StructType(StructField(id,IntegerType,false), StructField(address,StringType,true), StructField(phone,StringType,true)) {code} The first line shows projections and the second line shows the pruned schema. We can see that get_json_object(phone#36, $.code) is filtered. This expression retrieves field 'code' from phone json column. We should allow json column information to be passed via pruneColumns(). was: Currently PushDownUtils#pruneColumns only passes root fields to SupportsPushDownRequiredColumns implementation(s). 2021-01-05 19:36:07,437 (Time-limited test) [DEBUG - org.apache.spark.internal.Logging.logDebug(Logging.scala:61)] nested schema projection List(id#33, address#34, phone#36, get_json_object(phone#36, $.code) AS get_json_object(phone, $.code)#37) 2021-01-05 19:36:07,438 (Time-limited test) [DEBUG - org.apache.spark.internal.Logging.logDebug(Logging.scala:61)] nested schema StructType(StructField(id,IntegerType,false), StructField(address,StringType,true), StructField(phone,StringType,true)) The first line shows projections and the second line shows the pruned schema. We can see that get_json_object(phone#36, $.code) is filtered. This expression retrieves field 'code' from phone json column. We should allow json column information to be passed via pruneColumns(). > Pass json column information via pruneColumns() > --- > > Key: SPARK-34017 > URL: https://issues.apache.org/jira/browse/SPARK-34017 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.1 >Reporter: Ted Yu >Priority: Major > > Currently PushDownUtils#pruneColumns only passes root fields to > SupportsPushDownRequiredColumns implementation(s). > {code} > 2021-01-05 19:36:07,437 (Time-limited test) [DEBUG - > org.apache.spark.internal.Logging.logDebug(Logging.scala:61)] nested schema > projection List(id#33, address#34, phone#36, get_json_object(phone#36, > $.code) AS get_json_object(phone, $.code)#37) > 2021-01-05 19:36:07,438 (Time-limited test) [DEBUG - > org.apache.spark.internal.Logging.logDebug(Logging.scala:61)] nested schema > StructType(StructField(id,IntegerType,false), > StructField(address,StringType,true), StructField(phone,StringType,true)) > {code} > The first line shows projections and the second line shows the pruned schema. > We can see that get_json_object(phone#36, $.code) is filtered. This > expression retrieves field 'code' from phone json column. > We should allow json column information to be passed via pruneColumns(). -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-34017) Pass json column information via pruneColumns()
Ted Yu created SPARK-34017: -- Summary: Pass json column information via pruneColumns() Key: SPARK-34017 URL: https://issues.apache.org/jira/browse/SPARK-34017 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.0.1 Reporter: Ted Yu Currently PushDownUtils#pruneColumns only passes root fields to SupportsPushDownRequiredColumns implementation(s). 2021-01-05 19:36:07,437 (Time-limited test) [DEBUG - org.apache.spark.internal.Logging.logDebug(Logging.scala:61)] nested schema projection List(id#33, address#34, phone#36, get_json_object(phone#36, $.code) AS get_json_object(phone, $.code)#37) 2021-01-05 19:36:07,438 (Time-limited test) [DEBUG - org.apache.spark.internal.Logging.logDebug(Logging.scala:61)] nested schema StructType(StructField(id,IntegerType,false), StructField(address,StringType,true), StructField(phone,StringType,true)) The first line shows projections and the second line shows the pruned schema. We can see that get_json_object(phone#36, $.code) is filtered. This expression retrieves field 'code' from phone json column. We should allow json column information to be passed via pruneColumns(). -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-33997) Running bin/spark-sql gives NoSuchMethodError
[ https://issues.apache.org/jira/browse/SPARK-33997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu resolved SPARK-33997. Resolution: Cannot Reproduce Rebuilt Spark locally and the error was gone. > Running bin/spark-sql gives NoSuchMethodError > - > > Key: SPARK-33997 > URL: https://issues.apache.org/jira/browse/SPARK-33997 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.1 >Reporter: Ted Yu >Priority: Major > > I ran 'mvn install -Phive -Phive-thriftserver -DskipTests' > Running bin/spark-sql gives the following error: > {code} > 21/01/05 00:06:06 WARN NativeCodeLoader: Unable to load native-hadoop library > for your platform... using builtin-java classes where applicable > Using Spark's default log4j profile: > org/apache/spark/log4j-defaults.properties > Setting default log level to "WARN". > To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use > setLogLevel(newLevel). > Exception in thread "main" java.lang.NoSuchMethodError: > org.apache.spark.sql.internal.SharedState$.loadHiveConfFile$default$3()Lscala/collection/Map; > at > org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:136) > at > org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) > at > org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:934) > {code} > Scala version 2.12.10 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-33997) Running bin/spark-sql gives NoSuchMethodError
Ted Yu created SPARK-33997: -- Summary: Running bin/spark-sql gives NoSuchMethodError Key: SPARK-33997 URL: https://issues.apache.org/jira/browse/SPARK-33997 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.0.1 Reporter: Ted Yu I ran 'mvn install -Phive -Phive-thriftserver -DskipTests' Running bin/spark-sql gives the following error: {code} 21/01/05 00:06:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.sql.internal.SharedState$.loadHiveConfFile$default$3()Lscala/collection/Map; at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:136) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:934) {code} Scala version 2.12.10 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-33915) Allow json expression to be pushable column
[ https://issues.apache.org/jira/browse/SPARK-33915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17257647#comment-17257647 ] Ted Yu commented on SPARK-33915: Here is sample code for capturing the column and fields in downstream PredicatePushDown.scala {code} private val JSONCapture = "`GetJsonObject\\((.*),(.*)\\)`".r private def transformGetJsonObject(p: Predicate): Predicate = { val eq = p.asInstanceOf[sources.EqualTo] eq.attribute match { case JSONCapture(column,field) => val colName = column.toString.split("#")(0) val names = field.toString.split("\\.").foldLeft(List[String]()){(z, n) => z :+ "->'"+n+"'" } sources.EqualTo(colName + names.slice(1, names.size).mkString(""), eq.value).asInstanceOf[Predicate] case _ => sources.EqualTo("foo", "bar").asInstanceOf[Predicate] } } {code} > Allow json expression to be pushable column > --- > > Key: SPARK-33915 > URL: https://issues.apache.org/jira/browse/SPARK-33915 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.0.1 >Reporter: Ted Yu >Assignee: Apache Spark >Priority: Major > > Currently PushableColumnBase provides no support for json / jsonb expression. > Example of json expression: > {code} > get_json_object(phone, '$.code') = '1200' > {code} > If non-string literal is part of the expression, the presence of cast() would > complicate the situation. > Implication is that implementation of SupportsPushDownFilters doesn't have a > chance to perform pushdown even if third party DB engine supports json > expression pushdown. > This issue is for discussion and implementation of Spark core changes which > would allow json expression to be recognized as pushable column. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Issue Comment Deleted] (SPARK-33915) Allow json expression to be pushable column
[ https://issues.apache.org/jira/browse/SPARK-33915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated SPARK-33915: --- Comment: was deleted (was: Opened https://github.com/apache/spark/pull/30984) > Allow json expression to be pushable column > --- > > Key: SPARK-33915 > URL: https://issues.apache.org/jira/browse/SPARK-33915 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.0.1 >Reporter: Ted Yu >Assignee: Apache Spark >Priority: Major > > Currently PushableColumnBase provides no support for json / jsonb expression. > Example of json expression: > {code} > get_json_object(phone, '$.code') = '1200' > {code} > If non-string literal is part of the expression, the presence of cast() would > complicate the situation. > Implication is that implementation of SupportsPushDownFilters doesn't have a > chance to perform pushdown even if third party DB engine supports json > expression pushdown. > This issue is for discussion and implementation of Spark core changes which > would allow json expression to be recognized as pushable column. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-33953) readImages test fails due to NoClassDefFoundError for ImageTypeSpecifier
Ted Yu created SPARK-33953: -- Summary: readImages test fails due to NoClassDefFoundError for ImageTypeSpecifier Key: SPARK-33953 URL: https://issues.apache.org/jira/browse/SPARK-33953 Project: Spark Issue Type: Test Components: ML Affects Versions: 3.0.1 Reporter: Ted Yu >From https://github.com/apache/spark/pull/30984/checks?check_run_id=1630709203 >: ``` [info] org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 21.0 failed 1 times, most recent failure: Lost task 1.0 in stage 21.0 (TID 20) (fv-az212-589.internal.cloudapp.net executor driver): java.lang.NoClassDefFoundError: Could not initialize class javax.imageio.ImageTypeSpecifier [info] at com.sun.imageio.plugins.png.PNGImageReader.getImageTypes(PNGImageReader.java:1531) [info] at com.sun.imageio.plugins.png.PNGImageReader.readImage(PNGImageReader.java:1318) ``` It seems some dependency is missing. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-33915) Allow json expression to be pushable column
[ https://issues.apache.org/jira/browse/SPARK-33915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17257025#comment-17257025 ] Ted Yu commented on SPARK-33915: Opened https://github.com/apache/spark/pull/30984 > Allow json expression to be pushable column > --- > > Key: SPARK-33915 > URL: https://issues.apache.org/jira/browse/SPARK-33915 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.0.1 >Reporter: Ted Yu >Priority: Major > > Currently PushableColumnBase provides no support for json / jsonb expression. > Example of json expression: > {code} > get_json_object(phone, '$.code') = '1200' > {code} > If non-string literal is part of the expression, the presence of cast() would > complicate the situation. > Implication is that implementation of SupportsPushDownFilters doesn't have a > chance to perform pushdown even if third party DB engine supports json > expression pushdown. > This issue is for discussion and implementation of Spark core changes which > would allow json expression to be recognized as pushable column. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-33915) Allow json expression to be pushable column
[ https://issues.apache.org/jira/browse/SPARK-33915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17255389#comment-17255389 ] Ted Yu edited comment on SPARK-33915 at 12/31/20, 3:16 PM: --- Here is the plan prior to predicate pushdown: {code} 2020-12-26 03:28:59,926 (Time-limited test) [DEBUG - org.apache.spark.internal.Logging.logDebug(Logging.scala:61)] Adaptive execution enabled for plan: Sort [id#34 ASC NULLS FIRST], true, 0 +- Project [id#34, address#35, phone#37, get_json_object(phone#37, $.code) AS phone#33] +- Filter (get_json_object(phone#37, $.code) = 1200) +- BatchScan[id#34, address#35, phone#37] Cassandra Scan: test.person - Cassandra Filters: [] - Requested Columns: [id,address,phone] {code} Here is the plan with pushdown: {code} 2020-12-28 01:40:08,150 (Time-limited test) [DEBUG - org.apache.spark.internal.Logging.logDebug(Logging.scala:61)] Adaptive execution enabled for plan: Sort [id#34 ASC NULLS FIRST], true, 0 +- Project [id#34, address#35, phone#37, get_json_object(phone#37, $.code) AS phone#33] +- BatchScan[id#34, address#35, phone#37] Cassandra Scan: test.person - Cassandra Filters: [[phone->'code' = ?, 1200]] - Requested Columns: [id,address,phone] {code} was (Author: yuzhih...@gmail.com): Here is the plan prior to predicate pushdown: {code} 2020-12-26 03:28:59,926 (Time-limited test) [DEBUG - org.apache.spark.internal.Logging.logDebug(Logging.scala:61)] Adaptive execution enabled for plan: Sort [id#34 ASC NULLS FIRST], true, 0 +- Project [id#34, address#35, phone#37, get_json_object(phone#37, $.code) AS phone#33] +- Filter (get_json_object(phone#37, $.phone) = 1200) +- BatchScan[id#34, address#35, phone#37] Cassandra Scan: test.person - Cassandra Filters: [] - Requested Columns: [id,address,phone] {code} Here is the plan with pushdown: {code} 2020-12-28 01:40:08,150 (Time-limited test) [DEBUG - org.apache.spark.internal.Logging.logDebug(Logging.scala:61)] Adaptive execution enabled for plan: Sort [id#34 ASC NULLS FIRST], true, 0 +- Project [id#34, address#35, phone#37, get_json_object(phone#37, $.code) AS phone#33] +- BatchScan[id#34, address#35, phone#37] Cassandra Scan: test.person - Cassandra Filters: [[phone->'phone' = ?, 1200]] - Requested Columns: [id,address,phone] {code} > Allow json expression to be pushable column > --- > > Key: SPARK-33915 > URL: https://issues.apache.org/jira/browse/SPARK-33915 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.0.1 >Reporter: Ted Yu >Priority: Major > > Currently PushableColumnBase provides no support for json / jsonb expression. > Example of json expression: > {code} > get_json_object(phone, '$.code') = '1200' > {code} > If non-string literal is part of the expression, the presence of cast() would > complicate the situation. > Implication is that implementation of SupportsPushDownFilters doesn't have a > chance to perform pushdown even if third party DB engine supports json > expression pushdown. > This issue is for discussion and implementation of Spark core changes which > would allow json expression to be recognized as pushable column. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-33915) Allow json expression to be pushable column
[ https://issues.apache.org/jira/browse/SPARK-33915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17255389#comment-17255389 ] Ted Yu edited comment on SPARK-33915 at 12/29/20, 6:51 PM: --- Here is the plan prior to predicate pushdown: {code} 2020-12-26 03:28:59,926 (Time-limited test) [DEBUG - org.apache.spark.internal.Logging.logDebug(Logging.scala:61)] Adaptive execution enabled for plan: Sort [id#34 ASC NULLS FIRST], true, 0 +- Project [id#34, address#35, phone#37, get_json_object(phone#37, $.code) AS phone#33] +- Filter (get_json_object(phone#37, $.phone) = 1200) +- BatchScan[id#34, address#35, phone#37] Cassandra Scan: test.person - Cassandra Filters: [] - Requested Columns: [id,address,phone] {code} Here is the plan with pushdown: {code} 2020-12-28 01:40:08,150 (Time-limited test) [DEBUG - org.apache.spark.internal.Logging.logDebug(Logging.scala:61)] Adaptive execution enabled for plan: Sort [id#34 ASC NULLS FIRST], true, 0 +- Project [id#34, address#35, phone#37, get_json_object(phone#37, $.code) AS phone#33] +- BatchScan[id#34, address#35, phone#37] Cassandra Scan: test.person - Cassandra Filters: [[phone->'phone' = ?, 1200]] - Requested Columns: [id,address,phone] {code} was (Author: yuzhih...@gmail.com): Here is the plan prior to predicate pushdown: {code} 2020-12-26 03:28:59,926 (Time-limited test) [DEBUG - org.apache.spark.internal.Logging.logDebug(Logging.scala:61)] Adaptive execution enabled for plan: Sort [id#34 ASC NULLS FIRST], true, 0 +- Project [id#34, address#35, phone#37, get_json_object(phone#37, $.code) AS phone#33] +- Filter (get_json_object(phone#37, $.phone) = 1200) +- BatchScan[id#34, address#35, phone#37] Cassandra Scan: test.person - Cassandra Filters: [] - Requested Columns: [id,address,phone] {code} Here is the plan with pushdown: {code} 2020-12-28 01:40:08,150 (Time-limited test) [DEBUG - org.apache.spark.internal.Logging.logDebug(Logging.scala:61)] Adaptive execution enabled for plan: Sort [id#34 ASC NULLS FIRST], true, 0 +- Project [id#34, address#35, phone#37, get_json_object(phone#37, $.code) AS phone#33] +- BatchScan[id#34, address#35, phone#37] Cassandra Scan: test.person - Cassandra Filters: [["`GetJsonObject(phone#37,$.phone)`" = ?, 1200]] - Requested Columns: [id,address,phone] {code} > Allow json expression to be pushable column > --- > > Key: SPARK-33915 > URL: https://issues.apache.org/jira/browse/SPARK-33915 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.0.1 >Reporter: Ted Yu >Priority: Major > > Currently PushableColumnBase provides no support for json / jsonb expression. > Example of json expression: > {code} > get_json_object(phone, '$.code') = '1200' > {code} > If non-string literal is part of the expression, the presence of cast() would > complicate the situation. > Implication is that implementation of SupportsPushDownFilters doesn't have a > chance to perform pushdown even if third party DB engine supports json > expression pushdown. > This issue is for discussion and implementation of Spark core changes which > would allow json expression to be recognized as pushable column. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-33915) Allow json expression to be pushable column
[ https://issues.apache.org/jira/browse/SPARK-33915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17255631#comment-17255631 ] Ted Yu commented on SPARK-33915: [~codingcat] [~XuanYuan][~viirya][~Alex Herman] Can you provide your comment ? Thanks > Allow json expression to be pushable column > --- > > Key: SPARK-33915 > URL: https://issues.apache.org/jira/browse/SPARK-33915 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.0.1 >Reporter: Ted Yu >Priority: Major > > Currently PushableColumnBase provides no support for json / jsonb expression. > Example of json expression: > {code} > get_json_object(phone, '$.code') = '1200' > {code} > If non-string literal is part of the expression, the presence of cast() would > complicate the situation. > Implication is that implementation of SupportsPushDownFilters doesn't have a > chance to perform pushdown even if third party DB engine supports json > expression pushdown. > This issue is for discussion and implementation of Spark core changes which > would allow json expression to be recognized as pushable column. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-33915) Allow json expression to be pushable column
[ https://issues.apache.org/jira/browse/SPARK-33915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17255389#comment-17255389 ] Ted Yu commented on SPARK-33915: Here is the plan prior to predicate pushdown: {code} 2020-12-26 03:28:59,926 (Time-limited test) [DEBUG - org.apache.spark.internal.Logging.logDebug(Logging.scala:61)] Adaptive execution enabled for plan: Sort [id#34 ASC NULLS FIRST], true, 0 +- Project [id#34, address#35, phone#37, get_json_object(phone#37, $.code) AS phone#33] +- Filter (get_json_object(phone#37, $.phone) = 1200) +- BatchScan[id#34, address#35, phone#37] Cassandra Scan: test.person - Cassandra Filters: [] - Requested Columns: [id,address,phone] {code} Here is the plan with pushdown: {code} 2020-12-28 01:40:08,150 (Time-limited test) [DEBUG - org.apache.spark.internal.Logging.logDebug(Logging.scala:61)] Adaptive execution enabled for plan: Sort [id#34 ASC NULLS FIRST], true, 0 +- Project [id#34, address#35, phone#37, get_json_object(phone#37, $.code) AS phone#33] +- BatchScan[id#34, address#35, phone#37] Cassandra Scan: test.person - Cassandra Filters: [["`GetJsonObject(phone#37,$.phone)`" = ?, 1200]] - Requested Columns: [id,address,phone] {code} > Allow json expression to be pushable column > --- > > Key: SPARK-33915 > URL: https://issues.apache.org/jira/browse/SPARK-33915 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.0.1 >Reporter: Ted Yu >Priority: Major > > Currently PushableColumnBase provides no support for json / jsonb expression. > Example of json expression: > {code} > get_json_object(phone, '$.code') = '1200' > {code} > If non-string literal is part of the expression, the presence of cast() would > complicate the situation. > Implication is that implementation of SupportsPushDownFilters doesn't have a > chance to perform pushdown even if third party DB engine supports json > expression pushdown. > This issue is for discussion and implementation of Spark core changes which > would allow json expression to be recognized as pushable column. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-33915) Allow json expression to be pushable column
[ https://issues.apache.org/jira/browse/SPARK-33915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17254944#comment-17254944 ] Ted Yu edited comment on SPARK-33915 at 12/26/20, 4:14 AM: --- I have experimented with two patches which allow json expression to be pushable column (without the presence of cast). The first involves duplicating GetJsonObject as GetJsonString where eval() method returns UTF8String. FunctionRegistry.scala and functions.scala would have corresponding change. In PushableColumnBase class, new case for GetJsonString is added. Since code duplication is undesirable and case class cannot be directly subclassed, there is more work to be done in this direction. The second is simpler. The return type of GetJsonObject#eval is changed to UTF8String. I have run through catalyst / sql core unit tests which passed. In PushableColumnBase class, new case for GetJsonObject is added. In test output, I observed the following (for second patch, output for first patch is similar): {code} Post-Scan Filters: (get_json_object(phone#37, $.code) = 1200) {code} Comment on the two approaches (and other approach) is welcome. Below is snippet for second approach. {code} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index c22b68890a..8004ddd735 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -139,7 +139,7 @@ case class GetJsonObject(json: Expression, path: Expression) @transient private lazy val parsedPath = parsePath(path.eval().asInstanceOf[UTF8String]) - override def eval(input: InternalRow): Any = { + override def eval(input: InternalRow): UTF8String = { val jsonStr = json.eval(input).asInstanceOf[UTF8String] if (jsonStr == null) { return null diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index e4f001d61a..1cdc2642ba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -723,6 +725,12 @@ abstract class PushableColumnBase { } case s: GetStructField if nestedPredicatePushdownEnabled => helper(s.child).map(_ :+ s.childSchema(s.ordinal).name) + case GetJsonObject(col, field) => +Some(Seq("GetJsonObject(" + col + "," + field + ")")) case _ => None } helper(e).map(_.quoted) {code} was (Author: yuzhih...@gmail.com): I have experimented with two patches which allow json expression to be pushable column (without the presence of cast). The first involves duplicating GetJsonObject as GetJsonString where eval() method returns UTF8String. FunctionRegistry.scala and functions.scala would have corresponding change. In PushableColumnBase class, new case for GetJsonString is added. Since code duplication is undesirable and case class cannot be directly subclassed, there is more work to be done in this direction. The second is simpler. The return type of GetJsonObject#eval is changed to UTF8String. I have run through catalyst / sql core unit tests which passed. In PushableColumnBase class, new case for GetJsonObject is added. In test output, I observed the following (for second patch, output for first patch is similar): {code} Post-Scan Filters: (get_json_object(phone#37, $.phone) = 1200) {code} Comment on the two approaches (and other approach) is welcome. Below is snippet for second approach. {code} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index c22b68890a..8004ddd735 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -139,7 +139,7 @@ case class GetJsonObject(json: Expression, path: Expression) @transient private lazy val parsedPath = parsePath(path.eval().asInstanceOf[UTF8String]) - override def eval(input: InternalRow): Any = { + override def eval(input: InternalRow): UTF8String = { val jsonStr = json.eval(input).asInstanceOf[UTF8String] if (jsonStr == null) { return null diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index
[jira] [Comment Edited] (SPARK-33915) Allow json expression to be pushable column
[ https://issues.apache.org/jira/browse/SPARK-33915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17254944#comment-17254944 ] Ted Yu edited comment on SPARK-33915 at 12/26/20, 4:14 AM: --- I have experimented with two patches which allow json expression to be pushable column (without the presence of cast). The first involves duplicating GetJsonObject as GetJsonString where eval() method returns UTF8String. FunctionRegistry.scala and functions.scala would have corresponding change. In PushableColumnBase class, new case for GetJsonString is added. Since code duplication is undesirable and case class cannot be directly subclassed, there is more work to be done in this direction. The second is simpler. The return type of GetJsonObject#eval is changed to UTF8String. I have run through catalyst / sql core unit tests which passed. In PushableColumnBase class, new case for GetJsonObject is added. In test output, I observed the following (for second patch, output for first patch is similar): {code} Post-Scan Filters: (get_json_object(phone#37, $.phone) = 1200) {code} Comment on the two approaches (and other approach) is welcome. Below is snippet for second approach. {code} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index c22b68890a..8004ddd735 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -139,7 +139,7 @@ case class GetJsonObject(json: Expression, path: Expression) @transient private lazy val parsedPath = parsePath(path.eval().asInstanceOf[UTF8String]) - override def eval(input: InternalRow): Any = { + override def eval(input: InternalRow): UTF8String = { val jsonStr = json.eval(input).asInstanceOf[UTF8String] if (jsonStr == null) { return null diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index e4f001d61a..1cdc2642ba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -723,6 +725,12 @@ abstract class PushableColumnBase { } case s: GetStructField if nestedPredicatePushdownEnabled => helper(s.child).map(_ :+ s.childSchema(s.ordinal).name) + case GetJsonObject(col, field) => +Some(Seq("GetJsonObject(" + col + "," + field + ")")) case _ => None } helper(e).map(_.quoted) {code} was (Author: yuzhih...@gmail.com): I have experimented with two patches which allow json expression to be pushable column (without the presence of cast). The first involves duplicating GetJsonObject as GetJsonString where eval() method returns UTF8String. FunctionRegistry.scala and functions.scala would have corresponding change. In PushableColumnBase class, new case for GetJsonString is added. Since code duplication is undesirable and case class cannot be directly subclassed, there is more work to be done in this direction. The second is simpler. The return type of GetJsonObject#eval is changed to UTF8String. I have run through catalyst / sql core unit tests which passed. In PushableColumnBase class, new case for GetJsonObject is added. In test output, I observed the following (for second patch, output for first patch is similar): Post-Scan Filters: (get_json_object(phone#37, $.phone) = 1200) Comment on the two approaches (and other approach) is welcome. Below is snippet for second approach. {code} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index c22b68890a..8004ddd735 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -139,7 +139,7 @@ case class GetJsonObject(json: Expression, path: Expression) @transient private lazy val parsedPath = parsePath(path.eval().asInstanceOf[UTF8String]) - override def eval(input: InternalRow): Any = { + override def eval(input: InternalRow): UTF8String = { val jsonStr = json.eval(input).asInstanceOf[UTF8String] if (jsonStr == null) { return null diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index
[jira] [Commented] (SPARK-33915) Allow json expression to be pushable column
[ https://issues.apache.org/jira/browse/SPARK-33915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17254944#comment-17254944 ] Ted Yu commented on SPARK-33915: I have experimented with two patches which allow json expression to be pushable column (without the presence of cast). The first involves duplicating GetJsonObject as GetJsonString where eval() method returns UTF8String. FunctionRegistry.scala and functions.scala would have corresponding change. In PushableColumnBase class, new case for GetJsonString is added. Since code duplication is undesirable and case class cannot be directly subclassed, there is more work to be done in this direction. The second is simpler. The return type of GetJsonObject#eval is changed to UTF8String. I have run through catalyst / sql core unit tests which passed. In PushableColumnBase class, new case for GetJsonObject is added. In test output, I observed the following (for second patch, output for first patch is similar): Post-Scan Filters: (get_json_object(phone#37, $.phone) = 1200) Comment on the two approaches (and other approach) is welcome. Below is snippet for second approach. {code} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index c22b68890a..8004ddd735 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -139,7 +139,7 @@ case class GetJsonObject(json: Expression, path: Expression) @transient private lazy val parsedPath = parsePath(path.eval().asInstanceOf[UTF8String]) - override def eval(input: InternalRow): Any = { + override def eval(input: InternalRow): UTF8String = { val jsonStr = json.eval(input).asInstanceOf[UTF8String] if (jsonStr == null) { return null diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index e4f001d61a..1cdc2642ba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -723,6 +725,12 @@ abstract class PushableColumnBase { } case s: GetStructField if nestedPredicatePushdownEnabled => helper(s.child).map(_ :+ s.childSchema(s.ordinal).name) + case GetJsonObject(col, field) => +Some(Seq("GetJsonObject(" + col + "," + field + ")")) case _ => None } helper(e).map(_.quoted) {code} > Allow json expression to be pushable column > --- > > Key: SPARK-33915 > URL: https://issues.apache.org/jira/browse/SPARK-33915 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.0.1 >Reporter: Ted Yu >Priority: Major > > Currently PushableColumnBase provides no support for json / jsonb expression. > Example of json expression: > {code} > get_json_object(phone, '$.code') = '1200' > {code} > If non-string literal is part of the expression, the presence of cast() would > complicate the situation. > Implication is that implementation of SupportsPushDownFilters doesn't have a > chance to perform pushdown even if third party DB engine supports json > expression pushdown. > This issue is for discussion and implementation of Spark core changes which > would allow json expression to be recognized as pushable column. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-33915) Allow json expression to be pushable column
[ https://issues.apache.org/jira/browse/SPARK-33915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated SPARK-33915: --- Description: Currently PushableColumnBase provides no support for json / jsonb expression. Example of json expression: {code} get_json_object(phone, '$.code') = '1200' {code} If non-string literal is part of the expression, the presence of cast() would complicate the situation. Implication is that implementation of SupportsPushDownFilters doesn't have a chance to perform pushdown even if third party DB engine supports json expression pushdown. This issue is for discussion and implementation of Spark core changes which would allow json expression to be recognized as pushable column. was: Currently PushableColumnBase provides no support for json / jsonb expression. Example of json expression: get_json_object(phone, '$.code') = '1200' If non-string literal is part of the expression, the presence of cast() would complicate the situation. Implication is that implementation of SupportsPushDownFilters doesn't have a chance to perform pushdown even if third party DB engine supports json expression pushdown. This issue is for discussion and implementation of Spark core changes which would allow json expression to be recognized as pushable column. > Allow json expression to be pushable column > --- > > Key: SPARK-33915 > URL: https://issues.apache.org/jira/browse/SPARK-33915 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.0.1 >Reporter: Ted Yu >Priority: Major > > Currently PushableColumnBase provides no support for json / jsonb expression. > Example of json expression: > {code} > get_json_object(phone, '$.code') = '1200' > {code} > If non-string literal is part of the expression, the presence of cast() would > complicate the situation. > Implication is that implementation of SupportsPushDownFilters doesn't have a > chance to perform pushdown even if third party DB engine supports json > expression pushdown. > This issue is for discussion and implementation of Spark core changes which > would allow json expression to be recognized as pushable column. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-33915) Allow json expression to be pushable column
Ted Yu created SPARK-33915: -- Summary: Allow json expression to be pushable column Key: SPARK-33915 URL: https://issues.apache.org/jira/browse/SPARK-33915 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.0.1 Reporter: Ted Yu Currently PushableColumnBase provides no support for json / jsonb expression. Example of json expression: get_json_object(phone, '$.code') = '1200' If non-string literal is part of the expression, the presence of cast() would complicate the situation. Implication is that implementation of SupportsPushDownFilters doesn't have a chance to perform pushdown even if third party DB engine supports json expression pushdown. This issue is for discussion and implementation of Spark core changes which would allow json expression to be recognized as pushable column. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (HBASE-20226) Performance Improvement Taking Large Snapshots In Remote Filesystems
[ https://issues.apache.org/jira/browse/HBASE-20226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17166082#comment-17166082 ] Ted Yu commented on HBASE-20226: {code} +if (v1Regions.size() > 0 || v2Regions.size() > 0) { {code} It seems the thread pool is needed when v1Regions.size()+v2Regions.size() > 1. There are also a few findbugs warnings to be addressed. > Performance Improvement Taking Large Snapshots In Remote Filesystems > > > Key: HBASE-20226 > URL: https://issues.apache.org/jira/browse/HBASE-20226 > Project: HBase > Issue Type: Improvement > Components: snapshots >Affects Versions: 1.4.0 > Environment: HBase 1.4.0 running on an AWS EMR cluster with the > hbase.rootdir set to point to a folder in S3 >Reporter: Saad Mufti >Priority: Minor > Attachments: HBASE-20226..01.patch > > > When taking a snapshot of any table, one of the last steps is to delete the > region manifests, which have already been rolled up into a larger overall > manifest and thus have redundant information. > This proposal is to do the deletion in a thread pool bounded by > hbase.snapshot.thread.pool.max . For large tables with a lot of regions, the > current single threaded deletion is taking longer than all the rest of the > snapshot tasks when the Hbase data and the snapshot folder are both in a > remote filesystem like S3. > I have a patch for this proposal almost ready and will submit it tomorrow for > feedback, although I haven't had a chance to write any tests yet. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (HBASE-24137) The max merge count of metafixer should be configurable in MetaFixer
[ https://issues.apache.org/jira/browse/HBASE-24137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17078854#comment-17078854 ] Ted Yu commented on HBASE-24137: Please find people who have touched this class to review. I may not have time to review. > The max merge count of metafixer should be configurable in MetaFixer > > > Key: HBASE-24137 > URL: https://issues.apache.org/jira/browse/HBASE-24137 > Project: HBase > Issue Type: Improvement >Affects Versions: 3.0.0 >Reporter: Yu Wang >Priority: Minor > Fix For: 3.0.0 > > Attachments: 24137_master_1.patch > > > The max merge count of metafixer should be configurable in MetaFixer -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9504) Memory leak in KafkaMetrics registered to MBean
[ https://issues.apache.org/jira/browse/KAFKA-9504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032739#comment-17032739 ] Ted Yu commented on KAFKA-9504: --- It seems the closing of metrics is not enough in terms of preventing memory leak: {code} Utils.closeQuietly(kafkaConsumerMetrics, "kafka consumer metrics", firstException); Utils.closeQuietly(metrics, "consumer metrics", firstException); {code} > Memory leak in KafkaMetrics registered to MBean > --- > > Key: KAFKA-9504 > URL: https://issues.apache.org/jira/browse/KAFKA-9504 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.4.0 >Reporter: Andreas Holmén >Priority: Major > > After close() called on a KafkaConsumer some registered MBeans are not > unregistered causing leak. > > > {code:java} > import static > org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; > import java.lang.management.ManagementFactory; > import java.util.HashMap; > import java.util.Map; > import javax.management.MBeanServer; > import org.apache.kafka.clients.consumer.KafkaConsumer; > import org.apache.kafka.common.serialization.ByteArrayDeserializer; > public class Leaker { > private static String bootstrapServers = "hostname:9092"; > > public static void main(String[] args) throws InterruptedException { > MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); > Map props = new HashMap<>(); > props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); > > int beans = mBeanServer.getMBeanCount(); > for (int i = 0; i < 100; i++) { >KafkaConsumer consumer = new KafkaConsumer<>(props, new > ByteArrayDeserializer(), new ByteArrayDeserializer()); >consumer.close(); > } > int newBeans = mBeanServer.getMBeanCount(); > System.out.println("\nbeans delta: " + (newBeans - beans)); > } > } > {code} > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9471) Throw exception for DEAD StreamThread.State
[ https://issues.apache.org/jira/browse/KAFKA-9471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu resolved KAFKA-9471. --- Resolution: Duplicate > Throw exception for DEAD StreamThread.State > --- > > Key: KAFKA-9471 > URL: https://issues.apache.org/jira/browse/KAFKA-9471 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ted Yu >Assignee: Ted Yu >Priority: Minor > > In StreamThreadStateStoreProvider we have: > {code} > if (streamThread.state() == StreamThread.State.DEAD) { > return Collections.emptyList(); > {code} > If user cannot retry anymore, we should throw exception which is handled in > the else block. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9471) Throw exception for DEAD StreamThread.State
[ https://issues.apache.org/jira/browse/KAFKA-9471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu resolved KAFKA-9471. --- Resolution: Duplicate > Throw exception for DEAD StreamThread.State > --- > > Key: KAFKA-9471 > URL: https://issues.apache.org/jira/browse/KAFKA-9471 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ted Yu >Assignee: Ted Yu >Priority: Minor > > In StreamThreadStateStoreProvider we have: > {code} > if (streamThread.state() == StreamThread.State.DEAD) { > return Collections.emptyList(); > {code} > If user cannot retry anymore, we should throw exception which is handled in > the else block. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9471) Throw exception for DEAD StreamThread.State
[ https://issues.apache.org/jira/browse/KAFKA-9471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17022970#comment-17022970 ] Ted Yu commented on KAFKA-9471: --- [~vitojeng] Please let me know if I should proceed with this or, do you plan to include the exception throwing in KIP-216 ? > Throw exception for DEAD StreamThread.State > --- > > Key: KAFKA-9471 > URL: https://issues.apache.org/jira/browse/KAFKA-9471 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ted Yu >Assignee: Ted Yu >Priority: Minor > > In StreamThreadStateStoreProvider we have: > {code} > if (streamThread.state() == StreamThread.State.DEAD) { > return Collections.emptyList(); > {code} > If user cannot retry anymore, we should throw exception which is handled in > the else block. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9471) Throw exception for DEAD StreamThread.State
[ https://issues.apache.org/jira/browse/KAFKA-9471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17022595#comment-17022595 ] Ted Yu commented on KAFKA-9471: --- [~vitojeng] Looks like the following query depends on the empty collection for DEAD state: {code} @Test public void shouldAllowToQueryAfterThreadDied() throws Exception { {code} It fails when exception is thrown. What do you think ? > Throw exception for DEAD StreamThread.State > --- > > Key: KAFKA-9471 > URL: https://issues.apache.org/jira/browse/KAFKA-9471 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ted Yu >Assignee: Ted Yu >Priority: Minor > > In StreamThreadStateStoreProvider we have: > {code} > if (streamThread.state() == StreamThread.State.DEAD) { > return Collections.emptyList(); > {code} > If user cannot retry anymore, we should throw exception which is handled in > the else block. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Issue Comment Deleted] (KAFKA-9471) Throw exception for DEAD StreamThread.State
[ https://issues.apache.org/jira/browse/KAFKA-9471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated KAFKA-9471: -- Comment: was deleted (was: I will send out a PR soon alone the above line of thinking.) > Throw exception for DEAD StreamThread.State > --- > > Key: KAFKA-9471 > URL: https://issues.apache.org/jira/browse/KAFKA-9471 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ted Yu >Assignee: Ted Yu >Priority: Minor > > In StreamThreadStateStoreProvider we have: > {code} > if (streamThread.state() == StreamThread.State.DEAD) { > return Collections.emptyList(); > {code} > If user cannot retry anymore, we should throw exception which is handled in > the else block. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9471) Throw exception for DEAD StreamThread.State
[ https://issues.apache.org/jira/browse/KAFKA-9471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated KAFKA-9471: -- Summary: Throw exception for DEAD StreamThread.State (was: Return empty collection for PENDING_SHUTDOWN) > Throw exception for DEAD StreamThread.State > --- > > Key: KAFKA-9471 > URL: https://issues.apache.org/jira/browse/KAFKA-9471 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ted Yu >Assignee: Ted Yu >Priority: Minor > > In StreamThreadStateStoreProvider we have: > {code} > if (streamThread.state() == StreamThread.State.DEAD) { > return Collections.emptyList(); > {code} > If user cannot retry anymore, we should throw exception which is handled in > the else block. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9471) Return empty collection for PENDING_SHUTDOWN
[ https://issues.apache.org/jira/browse/KAFKA-9471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated KAFKA-9471: -- Description: In StreamThreadStateStoreProvider we have: {code} if (streamThread.state() == StreamThread.State.DEAD) { return Collections.emptyList(); {code} If user cannot retry anymore, we should throw exception which is handled in the else block. was: In StreamThreadStateStoreProvider we have: {code} if (streamThread.state() == StreamThread.State.DEAD) { return Collections.emptyList(); {code} PENDING_SHUTDOWN is precursor to DEAD state. PENDING_SHUTDOWN should be treated the same way as DEAD. This makes more sense than current behavior of throwing exception for PENDING_SHUTDOWN. > Return empty collection for PENDING_SHUTDOWN > > > Key: KAFKA-9471 > URL: https://issues.apache.org/jira/browse/KAFKA-9471 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ted Yu >Assignee: Ted Yu >Priority: Minor > > In StreamThreadStateStoreProvider we have: > {code} > if (streamThread.state() == StreamThread.State.DEAD) { > return Collections.emptyList(); > {code} > If user cannot retry anymore, we should throw exception which is handled in > the else block. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9471) Return empty collection for PENDING_SHUTDOWN
[ https://issues.apache.org/jira/browse/KAFKA-9471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17022589#comment-17022589 ] Ted Yu commented on KAFKA-9471: --- I will send out a PR soon alone the above line of thinking. > Return empty collection for PENDING_SHUTDOWN > > > Key: KAFKA-9471 > URL: https://issues.apache.org/jira/browse/KAFKA-9471 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ted Yu >Assignee: Ted Yu >Priority: Minor > > In StreamThreadStateStoreProvider we have: > {code} > if (streamThread.state() == StreamThread.State.DEAD) { > return Collections.emptyList(); > {code} > PENDING_SHUTDOWN is precursor to DEAD state. > PENDING_SHUTDOWN should be treated the same way as DEAD. > This makes more sense than current behavior of throwing exception for > PENDING_SHUTDOWN. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9471) Return empty collection for PENDING_SHUTDOWN
Ted Yu created KAFKA-9471: - Summary: Return empty collection for PENDING_SHUTDOWN Key: KAFKA-9471 URL: https://issues.apache.org/jira/browse/KAFKA-9471 Project: Kafka Issue Type: Improvement Reporter: Ted Yu Assignee: Ted Yu In StreamThreadStateStoreProvider we have: {code} if (streamThread.state() == StreamThread.State.DEAD) { return Collections.emptyList(); {code} PENDING_SHUTDOWN should be treated the same way as DEAD. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9471) Return empty collection for PENDING_SHUTDOWN
Ted Yu created KAFKA-9471: - Summary: Return empty collection for PENDING_SHUTDOWN Key: KAFKA-9471 URL: https://issues.apache.org/jira/browse/KAFKA-9471 Project: Kafka Issue Type: Improvement Reporter: Ted Yu Assignee: Ted Yu In StreamThreadStateStoreProvider we have: {code} if (streamThread.state() == StreamThread.State.DEAD) { return Collections.emptyList(); {code} PENDING_SHUTDOWN should be treated the same way as DEAD. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9471) Return empty collection for PENDING_SHUTDOWN
[ https://issues.apache.org/jira/browse/KAFKA-9471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated KAFKA-9471: -- Description: In StreamThreadStateStoreProvider we have: {code} if (streamThread.state() == StreamThread.State.DEAD) { return Collections.emptyList(); {code} PENDING_SHUTDOWN is precursor to DEAD state. PENDING_SHUTDOWN should be treated the same way as DEAD. This makes more sense than current behavior of throwing exception for PENDING_SHUTDOWN. was: In StreamThreadStateStoreProvider we have: {code} if (streamThread.state() == StreamThread.State.DEAD) { return Collections.emptyList(); {code} PENDING_SHUTDOWN should be treated the same way as DEAD. > Return empty collection for PENDING_SHUTDOWN > > > Key: KAFKA-9471 > URL: https://issues.apache.org/jira/browse/KAFKA-9471 > Project: Kafka > Issue Type: Improvement >Reporter: Ted Yu >Assignee: Ted Yu >Priority: Minor > > In StreamThreadStateStoreProvider we have: > {code} > if (streamThread.state() == StreamThread.State.DEAD) { > return Collections.emptyList(); > {code} > PENDING_SHUTDOWN is precursor to DEAD state. > PENDING_SHUTDOWN should be treated the same way as DEAD. > This makes more sense than current behavior of throwing exception for > PENDING_SHUTDOWN. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9471) Return empty collection for PENDING_SHUTDOWN
[ https://issues.apache.org/jira/browse/KAFKA-9471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17022373#comment-17022373 ] Ted Yu commented on KAFKA-9471: --- [~mjsax][~guozhang] Please comment. > Return empty collection for PENDING_SHUTDOWN > > > Key: KAFKA-9471 > URL: https://issues.apache.org/jira/browse/KAFKA-9471 > Project: Kafka > Issue Type: Improvement >Reporter: Ted Yu >Assignee: Ted Yu >Priority: Minor > > In StreamThreadStateStoreProvider we have: > {code} > if (streamThread.state() == StreamThread.State.DEAD) { > return Collections.emptyList(); > {code} > PENDING_SHUTDOWN is precursor to DEAD state. > PENDING_SHUTDOWN should be treated the same way as DEAD. > This makes more sense than current behavior of throwing exception for > PENDING_SHUTDOWN. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9464) Close the producer in completeShutdown
[ https://issues.apache.org/jira/browse/KAFKA-9464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu resolved KAFKA-9464. --- Resolution: Not A Problem > Close the producer in completeShutdown > -- > > Key: KAFKA-9464 > URL: https://issues.apache.org/jira/browse/KAFKA-9464 > Project: Kafka > Issue Type: Bug >Reporter: Ted Yu >Assignee: Ted Yu >Priority: Minor > > In StreamThread#completeShutdown, the producer (if not null) should be closed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9464) Close the producer in completeShutdown
[ https://issues.apache.org/jira/browse/KAFKA-9464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu resolved KAFKA-9464. --- Resolution: Not A Problem > Close the producer in completeShutdown > -- > > Key: KAFKA-9464 > URL: https://issues.apache.org/jira/browse/KAFKA-9464 > Project: Kafka > Issue Type: Bug >Reporter: Ted Yu >Assignee: Ted Yu >Priority: Minor > > In StreamThread#completeShutdown, the producer (if not null) should be closed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9465) Enclose consumer call with catching InvalidOffsetException
Ted Yu created KAFKA-9465: - Summary: Enclose consumer call with catching InvalidOffsetException Key: KAFKA-9465 URL: https://issues.apache.org/jira/browse/KAFKA-9465 Project: Kafka Issue Type: Improvement Reporter: Ted Yu In maybeUpdateStandbyTasks, the try block encloses restoreConsumer.poll and record handling. Since InvalidOffsetException is thrown by restoreConsumer.poll, we should enclose this call in the try block. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9465) Enclose consumer call with catching InvalidOffsetException
Ted Yu created KAFKA-9465: - Summary: Enclose consumer call with catching InvalidOffsetException Key: KAFKA-9465 URL: https://issues.apache.org/jira/browse/KAFKA-9465 Project: Kafka Issue Type: Improvement Reporter: Ted Yu In maybeUpdateStandbyTasks, the try block encloses restoreConsumer.poll and record handling. Since InvalidOffsetException is thrown by restoreConsumer.poll, we should enclose this call in the try block. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9464) Close the producer in completeShutdown
Ted Yu created KAFKA-9464: - Summary: Close the producer in completeShutdown Key: KAFKA-9464 URL: https://issues.apache.org/jira/browse/KAFKA-9464 Project: Kafka Issue Type: Bug Reporter: Ted Yu In StreamThread#completeShutdown, the producer (if not null) should be closed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9464) Close the producer in completeShutdown
Ted Yu created KAFKA-9464: - Summary: Close the producer in completeShutdown Key: KAFKA-9464 URL: https://issues.apache.org/jira/browse/KAFKA-9464 Project: Kafka Issue Type: Bug Reporter: Ted Yu In StreamThread#completeShutdown, the producer (if not null) should be closed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9463) Transient failure in KafkaAdminClientTest.testListOffsets
Ted Yu created KAFKA-9463: - Summary: Transient failure in KafkaAdminClientTest.testListOffsets Key: KAFKA-9463 URL: https://issues.apache.org/jira/browse/KAFKA-9463 Project: Kafka Issue Type: Test Reporter: Ted Yu When running tests with Java 11, I got the following test failure: {code} java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) at org.apache.kafka.clients.admin.KafkaAdminClientTest.testListOffsets(KafkaAdminClientTest.java:2336) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. {code} KafkaAdminClientTest.testListOffsets passes when it is run alone. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9463) Transient failure in KafkaAdminClientTest.testListOffsets
Ted Yu created KAFKA-9463: - Summary: Transient failure in KafkaAdminClientTest.testListOffsets Key: KAFKA-9463 URL: https://issues.apache.org/jira/browse/KAFKA-9463 Project: Kafka Issue Type: Test Reporter: Ted Yu When running tests with Java 11, I got the following test failure: {code} java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) at org.apache.kafka.clients.admin.KafkaAdminClientTest.testListOffsets(KafkaAdminClientTest.java:2336) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. {code} KafkaAdminClientTest.testListOffsets passes when it is run alone. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9462) Correct exception message in DistributedHerder
Ted Yu created KAFKA-9462: - Summary: Correct exception message in DistributedHerder Key: KAFKA-9462 URL: https://issues.apache.org/jira/browse/KAFKA-9462 Project: Kafka Issue Type: Bug Reporter: Ted Yu There are a few exception messages in DistributedHerder which were copied from other exception message. This task corrects the messages to reflect actual condition -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9462) Correct exception message in DistributedHerder
Ted Yu created KAFKA-9462: - Summary: Correct exception message in DistributedHerder Key: KAFKA-9462 URL: https://issues.apache.org/jira/browse/KAFKA-9462 Project: Kafka Issue Type: Bug Reporter: Ted Yu There are a few exception messages in DistributedHerder which were copied from other exception message. This task corrects the messages to reflect actual condition -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9461) Limit DEBUG statement size when logging failed record value
[ https://issues.apache.org/jira/browse/KAFKA-9461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020262#comment-17020262 ] Ted Yu commented on KAFKA-9461: --- Since we may not see the complete record given any threshold for size limit, we can use hardcoded value when the limit is added. > Limit DEBUG statement size when logging failed record value > --- > > Key: KAFKA-9461 > URL: https://issues.apache.org/jira/browse/KAFKA-9461 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 2.4.0 >Reporter: Nicolas Guyomar >Priority: Minor > > Hi, > It is possible with the current implementation that we log a full record > content at DEBUG level, which can overwhelmed log4j buffer and OOM it : > That stack trace was due to a 70MB messages refused by a broker > {code:java} > java.lang.OutOfMemoryError: Java heap space > at java.util.Arrays.copyOf(Arrays.java:3332) > at > java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124) > at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448) > at java.lang.StringBuffer.append(StringBuffer.java:270) > at > org.apache.log4j.helpers.PatternParser$LiteralPatternConverter.format(PatternParser.java:419) > at org.apache.log4j.PatternLayout.format(PatternLayout.java:506) > at org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:310) > at > org.apache.log4j.RollingFileAppender.subAppend(RollingFileAppender.java:276) > at org.apache.log4j.WriterAppender.append(WriterAppender.java:162) > at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251) > at > org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66) > at org.apache.log4j.Category.callAppenders(Category.java:206) > at org.apache.log4j.Category.forcedLog(Category.java:391) > at org.apache.log4j.Category.log(Category.java:856) > at org.slf4j.impl.Log4jLoggerAdapter.debug(Log4jLoggerAdapter.java:252) > at > org.apache.kafka.connect.runtime.WorkerSourceTask$1.onCompletion(WorkerSourceTask.java:330){code} > in > [https://github.com/apache/kafka/blob/da4337271ef0b72643c0cf47ae51e69f79cf1901/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L348] > Would it make sense to protect Connect directly in the ConnectRecord > toString() method and set a configurable limit ? > Thank you > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9455) Consider using TreeMap for In-memory stores of Streams
[ https://issues.apache.org/jira/browse/KAFKA-9455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17019191#comment-17019191 ] Ted Yu commented on KAFKA-9455: --- Maybe we can also look at (profile) Maps from fastutil such as: http://fastutil.di.unimi.it/docs/it/unimi/dsi/fastutil/objects/Object2ObjectSortedMap.html > Consider using TreeMap for In-memory stores of Streams > -- > > Key: KAFKA-9455 > URL: https://issues.apache.org/jira/browse/KAFKA-9455 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > > From [~ableegoldman]: It's worth noting that it might be a good idea to > switch to TreeMap for different reasons. Right now the ConcurrentSkipListMap > allows us to safely perform range queries without copying over the entire > keyset, but the performance on point queries seems to scale noticeably worse > with the number of unique keys. Point queries are used by aggregations while > range queries are used by windowed joins, but of course both are available > within the PAPI and for interactive queries so it's hard to say which we > should prefer. Maybe rather than make that tradeoff we should have one > version for efficient range queries (a "JoinWindowStore") and one for > efficient point queries ("AggWindowStore") - or something. I know we've had > similar thoughts for a different RocksDB store layout for Joins (although I > can't find that ticket anywhere..), it seems like the in-memory stores could > benefit from a special "Join" version as well cc/ Guozhang Wang > Here are some random thoughts: > 1. For kafka streams processing logic (i.e. without IQ), it's better to make > all processing logic relying on point queries rather than range queries. > Right now the only processor that use range queries are, as mentioned above, > windowed stream-stream joins. I think we should consider using a different > window implementation for this (and as a result also get rid of the > retainDuplicate flags) to refactor the windowed stream-stream join operation. > 2. With 1), range queries would only be exposed as IQ. Depending on its usage > frequency I think it makes lots of sense to optimize for single-point queries. > Of course, even without step 1) we should still consider using tree-map for > windowed in-memory stores to have a better scaling effect. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9450) Decouple inner state flushing from committing with EOS
[ https://issues.apache.org/jira/browse/KAFKA-9450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018602#comment-17018602 ] Ted Yu commented on KAFKA-9450: --- w.r.t. separate column family, since the data in this family tends to be small compared to the data family, wouldn't we end up with small files similar to rocksdb memtable flush ? > Decouple inner state flushing from committing with EOS > -- > > Key: KAFKA-9450 > URL: https://issues.apache.org/jira/browse/KAFKA-9450 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Sophie Blee-Goldman >Priority: Major > > When EOS is turned on, the commit interval is set quite low (100ms) and all > the store layers are flushed during a commit. This is necessary for > forwarding records in the cache to the changelog, but unfortunately also > forces rocksdb to flush the current memtable before it's full. The result is > a large number of small writes to disk, losing the benefits of batching, and > a large number of very small L0 files that are likely to slow compaction. > Since we have to delete the stores to recreate from scratch anyways during an > unclean shutdown with EOS, we may as well skip flushing the innermost > StateStore during a commit and only do so during a graceful shutdown, before > a rebalance, etc. This is currently blocked on a refactoring of the state > store layers to allow decoupling the flush of the caching layer from the > actual state store. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018368#comment-17018368 ] Ted Yu edited comment on KAFKA-8532 at 1/18/20 2:26 AM: Looking at js2.log , I am not sure deadlock was observed. Maybe handleRequests() took very long to execute (yet to be verified by whether there was exception in this code path from server log). I wonder if we can utilize the following form of await: {code} public boolean await(long timeout, TimeUnit unit) {code} so that the execution time of handleRequests() can be bounded. {code} diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala index 6a0809e16..32e4380c0 100755 --- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala +++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala @@ -165,7 +165,7 @@ class ZooKeeperClient(connectString: String, countDownLatch.countDown() } } - countDownLatch.await() + countDownLatch.await(sessionTimeoutMs, TimeUnit.MILLISECONDS) responseQueue.asScala.toBuffer } } {code} was (Author: yuzhih...@gmail.com): Looking at js2.log , I am not sure deadlock was observed. Maybe handleRequests() took very long to execute (yet to be verified by whether there was exception in this code path from server log). I wonder if we can utilize the following form of await: {code} public boolean await(long timeout, TimeUnit unit) {code} so that the execution time of handleRequests() can be bounded. {code} diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala index 6a0809e16..32e4380c0 100755 --- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala +++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala @@ -165,7 +165,7 @@ class ZooKeeperClient(connectString: String, countDownLatch.countDown() } } - countDownLatch.await() + countDownLatch.await(connectionTimeoutMs, TimeUnit.MILLISECONDS) responseQueue.asScala.toBuffer } } {code} > controller-event-thread deadlock with zk-session-expiry-handler0 > > > Key: KAFKA-8532 > URL: https://issues.apache.org/jira/browse/KAFKA-8532 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 >Reporter: leibo >Priority: Blocker > Attachments: js.log, js0.log, js1.log, js2.log > > > We have observed a serious deadlock between controller-event-thead and > zk-session-expirey-handle thread. When this issue occurred, it's only one way > to recovery the kafka cluster is restart kafka server. The follows is the > jstack log of controller-event-thead and zk-session-expiry-handle thread. > "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 > tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005ee3f7000> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // > 等待controller-event-thread线程处理expireEvent > at > kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533) > at > kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173) > at > kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374) > at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown > Source) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209) > at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428) > at >
[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018368#comment-17018368 ] Ted Yu edited comment on KAFKA-8532 at 1/18/20 2:22 AM: Looking at js2.log , I am not sure deadlock was observed. Maybe handleRequests() took very long to execute (yet to be verified by whether there was exception in this code path from server log). I wonder if we can utilize the following form of await: {code} public boolean await(long timeout, TimeUnit unit) {code} so that the execution time of handleRequests() can be bounded. {code} diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala index 6a0809e16..32e4380c0 100755 --- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala +++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala @@ -165,7 +165,7 @@ class ZooKeeperClient(connectString: String, countDownLatch.countDown() } } - countDownLatch.await() + countDownLatch.await(connectionTimeoutMs, TimeUnit.MILLISECONDS) responseQueue.asScala.toBuffer } } {code} was (Author: yuzhih...@gmail.com): Looking at js2.log , I am not sure deadlock was observed. Maybe handleRequests() took very long to execute (yet to be verified by whether there was exception in this code path from server log). I wonder if we can utilize the following form of await: ``` public boolean await(long timeout, TimeUnit unit) ``` so that the execution time of handleRequests() can be bounded. ``` diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala index 6a0809e16..32e4380c0 100755 --- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala +++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala @@ -165,7 +165,7 @@ class ZooKeeperClient(connectString: String, countDownLatch.countDown() } } - countDownLatch.await() + countDownLatch.await(connectionTimeoutMs, TimeUnit.MILLISECONDS) responseQueue.asScala.toBuffer } } ``` > controller-event-thread deadlock with zk-session-expiry-handler0 > > > Key: KAFKA-8532 > URL: https://issues.apache.org/jira/browse/KAFKA-8532 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 >Reporter: leibo >Priority: Blocker > Attachments: js.log, js0.log, js1.log, js2.log > > > We have observed a serious deadlock between controller-event-thead and > zk-session-expirey-handle thread. When this issue occurred, it's only one way > to recovery the kafka cluster is restart kafka server. The follows is the > jstack log of controller-event-thead and zk-session-expiry-handle thread. > "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 > tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005ee3f7000> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // > 等待controller-event-thread线程处理expireEvent > at > kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533) > at > kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173) > at > kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374) > at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown > Source) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209) > at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428) > at >
[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018368#comment-17018368 ] Ted Yu edited comment on KAFKA-8532 at 1/18/20 2:21 AM: Looking at js2.log , I am not sure deadlock was observed. Maybe handleRequests() took very long to execute (yet to be verified by whether there was exception in this code path from server log). I wonder if we can utilize the following form of await: ``` public boolean await(long timeout, TimeUnit unit) ``` so that the execution time of handleRequests() can be bounded. ``` diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala index 6a0809e16..32e4380c0 100755 --- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala +++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala @@ -165,7 +165,7 @@ class ZooKeeperClient(connectString: String, countDownLatch.countDown() } } - countDownLatch.await() + countDownLatch.await(connectionTimeoutMs, TimeUnit.MILLISECONDS) responseQueue.asScala.toBuffer } } ``` was (Author: yuzhih...@gmail.com): Looking at js2.log , I am not sure deadlock was observed. Maybe handleRequests() took very long to execute (yet to be verified by whether there was exception in this code path from server log). I wonder if we can utilize the following form of await: ``` public boolean await(long timeout, TimeUnit unit) ``` so that the execution time of handleRequests() can be bounded. > controller-event-thread deadlock with zk-session-expiry-handler0 > > > Key: KAFKA-8532 > URL: https://issues.apache.org/jira/browse/KAFKA-8532 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 >Reporter: leibo >Priority: Blocker > Attachments: js.log, js0.log, js1.log, js2.log > > > We have observed a serious deadlock between controller-event-thead and > zk-session-expirey-handle thread. When this issue occurred, it's only one way > to recovery the kafka cluster is restart kafka server. The follows is the > jstack log of controller-event-thead and zk-session-expiry-handle thread. > "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 > tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005ee3f7000> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // > 等待controller-event-thread线程处理expireEvent > at > kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533) > at > kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173) > at > kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374) > at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown > Source) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209) > at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428) > at > kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown > Source) > at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114) > at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown > Source) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at >
[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018368#comment-17018368 ] Ted Yu edited comment on KAFKA-8532 at 1/17/20 10:42 PM: - Looking at js2.log , I am not sure deadlock was observed. Maybe handleRequests() took very long to execute (yet to be verified by whether there was exception in this code path from server log). I wonder if we can utilize the following form of await: ``` public boolean await(long timeout, TimeUnit unit) ``` so that the execution time of handleRequests() can be bounded. was (Author: yuzhih...@gmail.com): Looking at js2.log , I am not sure deadlock was observed. Maybe handleRequests() took very long to execute. I wonder if we can utilize the following form of await: ``` public boolean await(long timeout, TimeUnit unit) ``` so that the execution time of handleRequests() can be bounded. > controller-event-thread deadlock with zk-session-expiry-handler0 > > > Key: KAFKA-8532 > URL: https://issues.apache.org/jira/browse/KAFKA-8532 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 >Reporter: leibo >Priority: Blocker > Attachments: js.log, js0.log, js1.log, js2.log > > > We have observed a serious deadlock between controller-event-thead and > zk-session-expirey-handle thread. When this issue occurred, it's only one way > to recovery the kafka cluster is restart kafka server. The follows is the > jstack log of controller-event-thead and zk-session-expiry-handle thread. > "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 > tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005ee3f7000> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // > 等待controller-event-thread线程处理expireEvent > at > kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533) > at > kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173) > at > kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374) > at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown > Source) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209) > at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428) > at > kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown > Source) > at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114) > at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown > Source) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Locked ownable synchronizers: > - <0x000661e8d2e0> (a java.util.concurrent.ThreadPoolExecutor$Worker) > "controller-event-thread" #51 prio=5 os_prio=0 tid=0x7fceaeec4000 > nid=0x310 waiting on condition [0x7fccb55c8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for
[jira] [Commented] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018368#comment-17018368 ] Ted Yu commented on KAFKA-8532: --- Looking at js2.log , I am not sure deadlock was observed. Maybe handleRequests() took very long to execute. I wonder if we can utilize the following form of await: ``` public boolean await(long timeout, TimeUnit unit) ``` so that the execution time of handleRequests() can be bounded. > controller-event-thread deadlock with zk-session-expiry-handler0 > > > Key: KAFKA-8532 > URL: https://issues.apache.org/jira/browse/KAFKA-8532 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 >Reporter: leibo >Priority: Blocker > Attachments: js.log, js0.log, js1.log, js2.log > > > We have observed a serious deadlock between controller-event-thead and > zk-session-expirey-handle thread. When this issue occurred, it's only one way > to recovery the kafka cluster is restart kafka server. The follows is the > jstack log of controller-event-thead and zk-session-expiry-handle thread. > "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 > tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005ee3f7000> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // > 等待controller-event-thread线程处理expireEvent > at > kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533) > at > kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173) > at > kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374) > at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown > Source) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209) > at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428) > at > kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown > Source) > at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114) > at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown > Source) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Locked ownable synchronizers: > - <0x000661e8d2e0> (a java.util.concurrent.ThreadPoolExecutor$Worker) > "controller-event-thread" #51 prio=5 os_prio=0 tid=0x7fceaeec4000 > nid=0x310 waiting on condition [0x7fccb55c8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005d1be5a00> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at >
[jira] [Commented] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018352#comment-17018352 ] Ted Yu commented on KAFKA-8532: --- Looking at KafkaController.scala in trunk, I don't see Expire.waitUntilProcessingStarted shown in the stack trace. It seems the class has gone through refactoring / bug fix. [~lbdai3190] If you can attach server log, that may help us find the root cause. Thanks > controller-event-thread deadlock with zk-session-expiry-handler0 > > > Key: KAFKA-8532 > URL: https://issues.apache.org/jira/browse/KAFKA-8532 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 >Reporter: leibo >Priority: Blocker > Attachments: js.log, js0.log, js1.log, js2.log > > > We have observed a serious deadlock between controller-event-thead and > zk-session-expirey-handle thread. When this issue occurred, it's only one way > to recovery the kafka cluster is restart kafka server. The follows is the > jstack log of controller-event-thead and zk-session-expiry-handle thread. > "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 > tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005ee3f7000> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // > 等待controller-event-thread线程处理expireEvent > at > kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533) > at > kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173) > at > kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374) > at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown > Source) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209) > at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428) > at > kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown > Source) > at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114) > at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown > Source) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Locked ownable synchronizers: > - <0x000661e8d2e0> (a java.util.concurrent.ThreadPoolExecutor$Worker) > "controller-event-thread" #51 prio=5 os_prio=0 tid=0x7fceaeec4000 > nid=0x310 waiting on condition [0x7fccb55c8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005d1be5a00> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at >
[jira] [Commented] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018175#comment-17018175 ] Ted Yu commented on KAFKA-8532: --- Created https://github.com/apache/kafka/pull/7978 > controller-event-thread deadlock with zk-session-expiry-handler0 > > > Key: KAFKA-8532 > URL: https://issues.apache.org/jira/browse/KAFKA-8532 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 >Reporter: leibo >Priority: Blocker > Attachments: js.log, js0.log, js1.log, js2.log > > > We have observed a serious deadlock between controller-event-thead and > zk-session-expirey-handle thread. When this issue occurred, it's only one way > to recovery the kafka cluster is restart kafka server. The follows is the > jstack log of controller-event-thead and zk-session-expiry-handle thread. > "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 > tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005ee3f7000> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // > 等待controller-event-thread线程处理expireEvent > at > kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533) > at > kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173) > at > kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374) > at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown > Source) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209) > at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428) > at > kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown > Source) > at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114) > at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown > Source) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Locked ownable synchronizers: > - <0x000661e8d2e0> (a java.util.concurrent.ThreadPoolExecutor$Worker) > "controller-event-thread" #51 prio=5 os_prio=0 tid=0x7fceaeec4000 > nid=0x310 waiting on condition [0x7fccb55c8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005d1be5a00> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at kafka.zookeeper.ZooKeeperClient.handleRequests(ZooKeeperClient.scala:157) > at >
[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018152#comment-17018152 ] Ted Yu edited comment on KAFKA-8532 at 1/17/20 4:13 PM: How about making the following change ? {code} diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala index 7b995931f..6a0809e16 100755 --- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala +++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala @@ -158,14 +158,11 @@ class ZooKeeperClient(connectString: String, inReadLock(initializationLock) { send(request) { response => responseQueue.add(response) - inFlightRequests.release() - countDownLatch.countDown() } } -} catch { - case e: Throwable => -inFlightRequests.release() -throw e + } finally { + inFlightRequests.release() + countDownLatch.countDown() } } countDownLatch.await() {code} countDownLatch is handled consistently with inFlightRequests. I have run through core:test which passed. I can send out a PR. was (Author: yuzhih...@gmail.com): How about making the following change ? {code} diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala index 7b995931f..6a0809e16 100755 --- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala +++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala @@ -158,14 +158,11 @@ class ZooKeeperClient(connectString: String, inReadLock(initializationLock) { send(request) { response => responseQueue.add(response) - inFlightRequests.release() - countDownLatch.countDown() } } -} catch { - case e: Throwable => -inFlightRequests.release() -throw e + } finally { + inFlightRequests.release() + countDownLatch.countDown() } } countDownLatch.await() {code} countDownLatch is handled consistently with inFlightRequests. > controller-event-thread deadlock with zk-session-expiry-handler0 > > > Key: KAFKA-8532 > URL: https://issues.apache.org/jira/browse/KAFKA-8532 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 >Reporter: leibo >Priority: Blocker > Attachments: js.log, js0.log, js1.log, js2.log > > > We have observed a serious deadlock between controller-event-thead and > zk-session-expirey-handle thread. When this issue occurred, it's only one way > to recovery the kafka cluster is restart kafka server. The follows is the > jstack log of controller-event-thead and zk-session-expiry-handle thread. > "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 > tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005ee3f7000> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // > 等待controller-event-thread线程处理expireEvent > at > kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533) > at > kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173) > at > kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374) > at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown > Source) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209) > at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374) > at >
[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception
[ https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16846843#comment-16846843 ] Ted Yu commented on KAFKA-5998: --- Can you move state directory outside of /tmp which is subject to cleaning by the OS ? > /.checkpoint.tmp Not found exception > > > Key: KAFKA-5998 > URL: https://issues.apache.org/jira/browse/KAFKA-5998 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1 >Reporter: Yogesh BG >Priority: Critical > Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, > props.txt, streams.txt > > > I have one kafka broker and one kafka stream running... I am running its > since two days under load of around 2500 msgs per second.. On third day am > getting below exception for some of the partitions, I have 16 partitions only > 0_0 and 0_1 gives this error > {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > 09:43:25.974 [ks_0_inst-StreamThread-15] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at >
[jira] [Resolved] (FLINK-10446) Use the "guava beta checker" plugin to keep off of @Beta API
[ https://issues.apache.org/jira/browse/FLINK-10446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu resolved FLINK-10446. Resolution: Won't Fix > Use the "guava beta checker" plugin to keep off of @Beta API > > > Key: FLINK-10446 > URL: https://issues.apache.org/jira/browse/FLINK-10446 > Project: Flink > Issue Type: Task > Components: Build System >Reporter: Ted Yu >Assignee: Ji Liu >Priority: Major > > The Guava people publish an Error Prone plugin to detect when stuff that's > annotated with @Beta gets used. Those things shouldn't be used because the > project gives no promises about deprecating before removal. > plugin: > https://github.com/google/guava-beta-checker -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-5998) /.checkpoint.tmp Not found exception
[ https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16827123#comment-16827123 ] Ted Yu edited comment on KAFKA-5998 at 4/26/19 5:31 PM: https://pastebin.com/vyvQ8pkF shows what I mentioned earlier. In StateDirectory#cleanRemovedTasks, we use both last modification time of the directory and whether the task has been committed as criteria. When both are satisfied, we clean the directory for the task. was (Author: yuzhih...@gmail.com): https://pastebin.com/Y247UZgb shows what I mentioned earlier (incomplete). In StateDirectory#cleanRemovedTasks, we use both last modification time of the directory and whether the task has been committed as criteria. When both are satisfied, we clean the directory for the task. > /.checkpoint.tmp Not found exception > > > Key: KAFKA-5998 > URL: https://issues.apache.org/jira/browse/KAFKA-5998 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1 >Reporter: Yogesh BG >Priority: Critical > Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, > props.txt, streams.txt > > > I have one kafka broker and one kafka stream running... I am running its > since two days under load of around 2500 msgs per second.. On third day am > getting below exception for some of the partitions, I have 16 partitions only > 0_0 and 0_1 gives this error > {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > 09:43:25.974 [ks_0_inst-StreamThread-15] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at
[jira] [Comment Edited] (KAFKA-5998) /.checkpoint.tmp Not found exception
[ https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16827123#comment-16827123 ] Ted Yu edited comment on KAFKA-5998 at 4/26/19 5:12 PM: https://pastebin.com/Y247UZgb shows what I mentioned earlier (incomplete). In StateDirectory#cleanRemovedTasks, we use both last modification time of the directory and whether the task has been committed as criteria. When both are satisfied, we clean the directory for the task. was (Author: yuzhih...@gmail.com): https://pastebin.com/Y247UZgb shows what I mentioned earlier. In StateDirectory#cleanRemovedTasks, we don't rely on last modification time - we check whether the task has been committed. > /.checkpoint.tmp Not found exception > > > Key: KAFKA-5998 > URL: https://issues.apache.org/jira/browse/KAFKA-5998 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1 >Reporter: Yogesh BG >Priority: Critical > Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, > props.txt, streams.txt > > > I have one kafka broker and one kafka stream running... I am running its > since two days under load of around 2500 msgs per second.. On third day am > getting below exception for some of the partitions, I have 16 partitions only > 0_0 and 0_1 gives this error > {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > 09:43:25.974 [ks_0_inst-StreamThread-15] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at
[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception
[ https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16827123#comment-16827123 ] Ted Yu commented on KAFKA-5998: --- https://pastebin.com/Y247UZgb shows what I mentioned earlier. In StateDirectory#cleanRemovedTasks, we don't rely on last modification time - we check whether the task has been committed. > /.checkpoint.tmp Not found exception > > > Key: KAFKA-5998 > URL: https://issues.apache.org/jira/browse/KAFKA-5998 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1 >Reporter: Yogesh BG >Priority: Critical > Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, > props.txt, streams.txt > > > I have one kafka broker and one kafka stream running... I am running its > since two days under load of around 2500 msgs per second.. On third day am > getting below exception for some of the partitions, I have 16 partitions only > 0_0 and 0_1 gives this error > {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > 09:43:25.974 [ks_0_inst-StreamThread-15] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324) >
[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception
[ https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16827089#comment-16827089 ] Ted Yu commented on KAFKA-5998: --- [~mparthas]: >From your comment on Apr 8th, it seems that making the value for >"state.cleanup.delay.ms" longer didn't avoid .checkpoint.tmp disappearing. Did you see log similar to the following prior to the error ? {code} April 25th 2019, 21:03:49.332 2019-04-25 21:03:49,332 INFO [org.apache.kafka.streams.processor.internals.StateDirectory] (application-fde85da6-9d2f-4457-8bdb-ea1c78c8c1e2-CleanupThread) - [short-component-name:; transaction-id:; user-id:; creation-time:] stream-thread [application-fde85da6-9d2f-4457-8bdb-ea1c78c8c1e2-CleanupThread] Deleting obsolete state directory 1_1 for task 1_1 as 813332ms has elapsed (cleanup delay is 60ms). {code} If so, that might indicate the new value for "state.cleanup.delay.ms" was still short. If not, there could be other reason for the problem. In case the log has been swapped out, please keep an eye for future occurrences. > /.checkpoint.tmp Not found exception > > > Key: KAFKA-5998 > URL: https://issues.apache.org/jira/browse/KAFKA-5998 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1 >Reporter: Yogesh BG >Priority: Critical > Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, > props.txt, streams.txt > > > I have one kafka broker and one kafka stream running... I am running its > since two days under load of around 2500 msgs per second.. On third day am > getting below exception for some of the partitions, I have 16 partitions only > 0_0 and 0_1 gives this error > {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > 09:43:25.974 [ks_0_inst-StreamThread-15] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to >
[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception
[ https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16822564#comment-16822564 ] Ted Yu commented on KAFKA-5998: --- One doubt I had after reading recent comments is that, if the state directory is used by more than one process, wouldn't the problem manifest itself other than disappearing checkpoint file (such as corrupted checkpoint) ? My former comment on reference counting was not refined (pending answer to the above doubt, we may come up with good solution). > /.checkpoint.tmp Not found exception > > > Key: KAFKA-5998 > URL: https://issues.apache.org/jira/browse/KAFKA-5998 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1 >Reporter: Yogesh BG >Priority: Critical > Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, > props.txt, streams.txt > > > I have one kafka broker and one kafka stream running... I am running its > since two days under load of around 2500 msgs per second.. On third day am > getting below exception for some of the partitions, I have 16 partitions only > 0_0 and 0_1 gives this error > {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > 09:43:25.974 [ks_0_inst-StreamThread-15] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] >
[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception
[ https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16822446#comment-16822446 ] Ted Yu commented on KAFKA-5998: --- I wonder if we can use a file, which records reference count (for processor instances), alongside checkpoint file. Checkpoint file is only deleted when count reaches zero. > /.checkpoint.tmp Not found exception > > > Key: KAFKA-5998 > URL: https://issues.apache.org/jira/browse/KAFKA-5998 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1 >Reporter: Yogesh BG >Priority: Critical > Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, > props.txt, streams.txt > > > I have one kafka broker and one kafka stream running... I am running its > since two days under load of around 2500 msgs per second.. On third day am > getting below exception for some of the partitions, I have 16 partitions only > 0_0 and 0_1 gives this error > {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > 09:43:25.974 [ks_0_inst-StreamThread-15] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324) >
[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception
[ https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16811681#comment-16811681 ] Ted Yu commented on KAFKA-5998: --- I made the following change: {code} diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java index badaa36cd..be29ebe56 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java @@ -138,6 +138,7 @@ public abstract class AbstractJoinIntegrationTest { RESULT_CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); RESULT_CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); +STREAMS_CONFIG.put(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, "1"); STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); {code} For the following check in KafkaStreams#start: {code} if (state == State.RUNNING) { stateDirectory.cleanRemovedTasks(cleanupDelay); {code} The state was REBALANCING during TableTableJoinIntegrationTest. The cleaning was not triggered. > /.checkpoint.tmp Not found exception > > > Key: KAFKA-5998 > URL: https://issues.apache.org/jira/browse/KAFKA-5998 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1 >Reporter: Yogesh BG >Priority: Major > Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, > props.txt, streams.txt > > > I have one kafka broker and one kafka stream running... I am running its > since two days under load of around 2500 msgs per second.. On third day am > getting below exception for some of the partitions, I have 16 partitions only > 0_0 and 0_1 gives this error > {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] >
[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception
[ https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16811603#comment-16811603 ] Ted Yu commented on KAFKA-5998: --- It seems StreamTask can keep track of active tasks. After commit() finishes, StreamTask can inform stateDirectory.cleanRemovedTasks that the underlying task is subject to cleaning. This would be more robust than depending on period of inactivity alone for cleaning. > /.checkpoint.tmp Not found exception > > > Key: KAFKA-5998 > URL: https://issues.apache.org/jira/browse/KAFKA-5998 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1 >Reporter: Yogesh BG >Priority: Major > Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, > props.txt, streams.txt > > > I have one kafka broker and one kafka stream running... I am running its > since two days under load of around 2500 msgs per second.. On third day am > getting below exception for some of the partitions, I have 16 partitions only > 0_0 and 0_1 gives this error > {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > 09:43:25.974 [ks_0_inst-StreamThread-15] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at >
[jira] [Comment Edited] (KAFKA-5998) /.checkpoint.tmp Not found exception
[ https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16811551#comment-16811551 ] Ted Yu edited comment on KAFKA-5998 at 4/6/19 10:55 AM: Cleanup delay is controlled by: {code} public static final String STATE_CLEANUP_DELAY_MS_CONFIG = "state.cleanup.delay.ms"; {code} Maybe longer value can be specified so that CleanupThread doesn't clean up the task directory prematurely (in case task directory hasn't been modified for a while). was (Author: yuzhih...@gmail.com): Cleanup delay is controlled by: {code} public static final String STATE_CLEANUP_DELAY_MS_CONFIG = "state.cleanup.delay.ms"; {code} Maybe longer value can be specified so that CleanupThread doesn't clean up the task directory prematurely. > /.checkpoint.tmp Not found exception > > > Key: KAFKA-5998 > URL: https://issues.apache.org/jira/browse/KAFKA-5998 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1 >Reporter: Yogesh BG >Priority: Major > Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, > props.txt, streams.txt > > > I have one kafka broker and one kafka stream running... I am running its > since two days under load of around 2500 msgs per second.. On third day am > getting below exception for some of the partitions, I have 16 partitions only > 0_0 and 0_1 gives this error > {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > 09:43:25.974 [ks_0_inst-StreamThread-15] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221)
[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception
[ https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16811551#comment-16811551 ] Ted Yu commented on KAFKA-5998: --- Cleanup delay is controlled by: {code} public static final String STATE_CLEANUP_DELAY_MS_CONFIG = "state.cleanup.delay.ms"; {code} Maybe longer value can be specified so that CleanupThread doesn't clean up the task directory prematurely. > /.checkpoint.tmp Not found exception > > > Key: KAFKA-5998 > URL: https://issues.apache.org/jira/browse/KAFKA-5998 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1 >Reporter: Yogesh BG >Priority: Major > Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, > props.txt, streams.txt > > > I have one kafka broker and one kafka stream running... I am running its > since two days under load of around 2500 msgs per second.. On third day am > getting below exception for some of the partitions, I have 16 partitions only > 0_0 and 0_1 gives this error > {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > 09:43:25.974 [ks_0_inst-StreamThread-15] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at >
[jira] [Comment Edited] (KAFKA-3729) Auto-configure non-default SerDes passed alongside the topology builder
[ https://issues.apache.org/jira/browse/KAFKA-3729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16794549#comment-16794549 ] Ted Yu edited comment on KAFKA-3729 at 3/18/19 3:06 AM: 3729.v6.txt shows the progress toward passing StreamsConfig to Store#init(). The assertion in KStreamAggregationDedupIntegrationTest#shouldGroupByKey passes. The assertion fails without proper Store init. Note, PR #6461 doesn't require KIP. was (Author: yuzhih...@gmail.com): 3729.v6.txt shows the progress toward passing StreamsConfig to Store#init(). The assertion in KStreamAggregationDedupIntegrationTest#shouldGroupByKey passes. The assertion fails without proper Store init. > Auto-configure non-default SerDes passed alongside the topology builder > > > Key: KAFKA-3729 > URL: https://issues.apache.org/jira/browse/KAFKA-3729 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Fred Patton >Assignee: Ted Yu >Priority: Major > Labels: api, newbie > Attachments: 3729.txt, 3729.v6.txt > > > From Guozhang Wang: > "Only default serdes provided through configs are auto-configured today. But > we could auto-configure other serdes passed alongside the topology builder as > well." -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-3729) Auto-configure non-default SerDes passed alongside the topology builder
[ https://issues.apache.org/jira/browse/KAFKA-3729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated KAFKA-3729: -- Attachment: (was: 3729.v6.txt) > Auto-configure non-default SerDes passed alongside the topology builder > > > Key: KAFKA-3729 > URL: https://issues.apache.org/jira/browse/KAFKA-3729 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Fred Patton >Assignee: Ted Yu >Priority: Major > Labels: api, newbie > Attachments: 3729.txt, 3729.v6.txt > > > From Guozhang Wang: > "Only default serdes provided through configs are auto-configured today. But > we could auto-configure other serdes passed alongside the topology builder as > well." -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-3729) Auto-configure non-default SerDes passed alongside the topology builder
[ https://issues.apache.org/jira/browse/KAFKA-3729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16794549#comment-16794549 ] Ted Yu edited comment on KAFKA-3729 at 3/17/19 6:39 PM: 3729.v6.txt shows the progress toward passing StreamsConfig to Store#init(). The assertion in KStreamAggregationDedupIntegrationTest#shouldGroupByKey passes. The assertion fails without proper Store init. was (Author: yuzhih...@gmail.com): 3729.v6.txt shows the progress toward passing StreamsConfig to Store#init(). > Auto-configure non-default SerDes passed alongside the topology builder > > > Key: KAFKA-3729 > URL: https://issues.apache.org/jira/browse/KAFKA-3729 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Fred Patton >Assignee: Ted Yu >Priority: Major > Labels: api, newbie > Attachments: 3729.txt, 3729.v6.txt > > > From Guozhang Wang: > "Only default serdes provided through configs are auto-configured today. But > we could auto-configure other serdes passed alongside the topology builder as > well." -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-3729) Auto-configure non-default SerDes passed alongside the topology builder
[ https://issues.apache.org/jira/browse/KAFKA-3729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated KAFKA-3729: -- Attachment: 3729.v6.txt > Auto-configure non-default SerDes passed alongside the topology builder > > > Key: KAFKA-3729 > URL: https://issues.apache.org/jira/browse/KAFKA-3729 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Fred Patton >Assignee: Ted Yu >Priority: Major > Labels: api, newbie > Attachments: 3729.txt, 3729.v6.txt > > > From Guozhang Wang: > "Only default serdes provided through configs are auto-configured today. But > we could auto-configure other serdes passed alongside the topology builder as > well." -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-3729) Auto-configure non-default SerDes passed alongside the topology builder
[ https://issues.apache.org/jira/browse/KAFKA-3729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16794549#comment-16794549 ] Ted Yu commented on KAFKA-3729: --- 3729.v6.txt shows the progress toward passing StreamsConfig to Store#init(). > Auto-configure non-default SerDes passed alongside the topology builder > > > Key: KAFKA-3729 > URL: https://issues.apache.org/jira/browse/KAFKA-3729 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Fred Patton >Assignee: Ted Yu >Priority: Major > Labels: api, newbie > Attachments: 3729.txt, 3729.v6.txt > > > From Guozhang Wang: > "Only default serdes provided through configs are auto-configured today. But > we could auto-configure other serdes passed alongside the topology builder as > well." -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-3729) Auto-configure non-default SerDes passed alongside the topology builder
[ https://issues.apache.org/jira/browse/KAFKA-3729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated KAFKA-3729: -- Attachment: 3729.v6.txt > Auto-configure non-default SerDes passed alongside the topology builder > > > Key: KAFKA-3729 > URL: https://issues.apache.org/jira/browse/KAFKA-3729 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Fred Patton >Assignee: Ted Yu >Priority: Major > Labels: api, newbie > Attachments: 3729.txt, 3729.v6.txt > > > From Guozhang Wang: > "Only default serdes provided through configs are auto-configured today. But > we could auto-configure other serdes passed alongside the topology builder as > well." -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-3729) Auto-configure non-default SerDes passed alongside the topology builder
[ https://issues.apache.org/jira/browse/KAFKA-3729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16787312#comment-16787312 ] Ted Yu commented on KAFKA-3729: --- https://github.com/apache/kafka/pull/6399 > Auto-configure non-default SerDes passed alongside the topology builder > > > Key: KAFKA-3729 > URL: https://issues.apache.org/jira/browse/KAFKA-3729 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Fred Patton >Priority: Major > Labels: api, newbie > Attachments: 3729.txt > > > From Guozhang Wang: > "Only default serdes provided through configs are auto-configured today. But > we could auto-configure other serdes passed alongside the topology builder as > well." -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-3729) Auto-configure non-default SerDes passed alongside the topology builder
[ https://issues.apache.org/jira/browse/KAFKA-3729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16787272#comment-16787272 ] Ted Yu commented on KAFKA-3729: --- The following seems to result in compilation error: {code} /Users/yute/kafka/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:667: warning: [unchecked] unchecked call to configure(Map,boolean) as a member of the raw type Serializer sn.getKeySer().configure(config.originals(), true); ^ /Users/yute/kafka/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:668: warning: [unchecked] unchecked call to configure(Map,boolean) as a member of the raw type Serializer sn.getValueSer().configure(config.originals(), false); ^ {code} I wonder how the warnings can be suppressed. I checked existing calls to configure() which doesn't give me much clue. > Auto-configure non-default SerDes passed alongside the topology builder > > > Key: KAFKA-3729 > URL: https://issues.apache.org/jira/browse/KAFKA-3729 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Fred Patton >Priority: Major > Labels: api, newbie > Attachments: 3729.txt > > > From Guozhang Wang: > "Only default serdes provided through configs are auto-configured today. But > we could auto-configure other serdes passed alongside the topology builder as > well." -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-3729) Auto-configure non-default SerDes passed alongside the topology builder
[ https://issues.apache.org/jira/browse/KAFKA-3729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16785119#comment-16785119 ] Ted Yu commented on KAFKA-3729: --- Attached tentative patch. If it is on right track, I can send out a PR. > Auto-configure non-default SerDes passed alongside the topology builder > > > Key: KAFKA-3729 > URL: https://issues.apache.org/jira/browse/KAFKA-3729 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Fred Patton >Priority: Major > Labels: api, newbie > Attachments: 3729.txt > > > From Guozhang Wang: > "Only default serdes provided through configs are auto-configured today. But > we could auto-configure other serdes passed alongside the topology builder as > well." -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-3729) Auto-configure non-default SerDes passed alongside the topology builder
[ https://issues.apache.org/jira/browse/KAFKA-3729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated KAFKA-3729: -- Attachment: 3729.txt > Auto-configure non-default SerDes passed alongside the topology builder > > > Key: KAFKA-3729 > URL: https://issues.apache.org/jira/browse/KAFKA-3729 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Fred Patton >Priority: Major > Labels: api, newbie > Attachments: 3729.txt > > > From Guozhang Wang: > "Only default serdes provided through configs are auto-configured today. But > we could auto-configure other serdes passed alongside the topology builder as > well." -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-3729) Auto-configure non-default SerDes passed alongside the topology builder
[ https://issues.apache.org/jira/browse/KAFKA-3729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16784755#comment-16784755 ] Ted Yu commented on KAFKA-3729: --- Can AbstractProcessorContext#appConfigs() be used to obtain the Map which configure() uses ? > Auto-configure non-default SerDes passed alongside the topology builder > > > Key: KAFKA-3729 > URL: https://issues.apache.org/jira/browse/KAFKA-3729 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Fred Patton >Priority: Major > Labels: api, newbie > > From Guozhang Wang: > "Only default serdes provided through configs are auto-configured today. But > we could auto-configure other serdes passed alongside the topology builder as > well." -- This message was sent by Atlassian JIRA (v7.6.3#76005)