[jira] [Resolved] (FLINK-32025) Make job cancellation button on UI configurable

2023-05-07 Thread Ted Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu resolved FLINK-32025.

Resolution: Duplicate

> Make job cancellation button on UI configurable
> ---
>
> Key: FLINK-32025
> URL: https://issues.apache.org/jira/browse/FLINK-32025
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Priority: Major
>
> On the flink job UI, there is `Cancel Job` button.
> When the job UI is shown to users, it is desirable to hide the button so that 
> normal user doesn't mistakenly cancel a long running flink job.
> This issue adds configuration for hiding the `Cancel Job` button.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32025) Make job cancellation button on UI configurable

2023-05-07 Thread Ted Yu (Jira)
Ted Yu created FLINK-32025:
--

 Summary: Make job cancellation button on UI configurable
 Key: FLINK-32025
 URL: https://issues.apache.org/jira/browse/FLINK-32025
 Project: Flink
  Issue Type: Improvement
Reporter: Ted Yu


On the flink job UI, there is `Cancel Job` button.

When the job UI is shown to users, it is desirable to hide the button so that 
normal user doesn't mistakenly cancel a long running flink job.

This issue adds configuration for hiding the `Cancel Job` button.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32025) Make job cancellation button on UI configurable

2023-05-07 Thread Ted Yu (Jira)
Ted Yu created FLINK-32025:
--

 Summary: Make job cancellation button on UI configurable
 Key: FLINK-32025
 URL: https://issues.apache.org/jira/browse/FLINK-32025
 Project: Flink
  Issue Type: Improvement
Reporter: Ted Yu


On the flink job UI, there is `Cancel Job` button.

When the job UI is shown to users, it is desirable to hide the button so that 
normal user doesn't mistakenly cancel a long running flink job.

This issue adds configuration for hiding the `Cancel Job` button.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (SPARK-42090) Introduce sasl retry count in RetryingBlockTransferor

2023-01-16 Thread Ted Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-42090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated SPARK-42090:
---
Description: 
Previously a boolean variable, saslTimeoutSeen, was used in 
RetryingBlockTransferor. However, the boolean variable wouldn't cover the 
following scenario:

1. SaslTimeoutException
2. IOException
3. SaslTimeoutException
4. IOException

Even though IOException at #2 is retried (resulting in increment of 
retryCount), the retryCount would be cleared at step #4.
Since the intention of saslTimeoutSeen is to undo the increment due to retrying 
SaslTimeoutException, we should keep a counter for SaslTimeoutException retries 
and subtract the value of this counter from retryCount.

  was:
Previously a boolean variable, saslTimeoutSeen, was used. However, the boolean 
variable wouldn't cover the following scenario:

1. SaslTimeoutException
2. IOException
3. SaslTimeoutException
4. IOException

Even though IOException at #2 is retried (resulting in increment of 
retryCount), the retryCount would be cleared at step #4.
Since the intention of saslTimeoutSeen is to undo the increment due to retrying 
SaslTimeoutException, we should keep a counter for SaslTimeoutException retries 
and subtract the value of this counter from retryCount.


> Introduce sasl retry count in RetryingBlockTransferor
> -
>
> Key: SPARK-42090
> URL: https://issues.apache.org/jira/browse/SPARK-42090
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.4.0
>Reporter: Ted Yu
>Priority: Major
>
> Previously a boolean variable, saslTimeoutSeen, was used in 
> RetryingBlockTransferor. However, the boolean variable wouldn't cover the 
> following scenario:
> 1. SaslTimeoutException
> 2. IOException
> 3. SaslTimeoutException
> 4. IOException
> Even though IOException at #2 is retried (resulting in increment of 
> retryCount), the retryCount would be cleared at step #4.
> Since the intention of saslTimeoutSeen is to undo the increment due to 
> retrying SaslTimeoutException, we should keep a counter for 
> SaslTimeoutException retries and subtract the value of this counter from 
> retryCount.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-42090) Introduce sasl retry count in RetryingBlockTransferor

2023-01-16 Thread Ted Yu (Jira)
Ted Yu created SPARK-42090:
--

 Summary: Introduce sasl retry count in RetryingBlockTransferor
 Key: SPARK-42090
 URL: https://issues.apache.org/jira/browse/SPARK-42090
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.4.0
Reporter: Ted Yu


Previously a boolean variable, saslTimeoutSeen, was used. However, the boolean 
variable wouldn't cover the following scenario:

1. SaslTimeoutException
2. IOException
3. SaslTimeoutException
4. IOException

Even though IOException at #2 is retried (resulting in increment of 
retryCount), the retryCount would be cleared at step #4.
Since the intention of saslTimeoutSeen is to undo the increment due to retrying 
SaslTimeoutException, we should keep a counter for SaslTimeoutException retries 
and subtract the value of this counter from retryCount.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-41853) Use Map in place of SortedMap for ErrorClassesJsonReader

2023-01-02 Thread Ted Yu (Jira)
Ted Yu created SPARK-41853:
--

 Summary: Use Map in place of SortedMap for ErrorClassesJsonReader
 Key: SPARK-41853
 URL: https://issues.apache.org/jira/browse/SPARK-41853
 Project: Spark
  Issue Type: Task
  Components: Spark Core
Affects Versions: 3.2.3
Reporter: Ted Yu


The use of SortedMap in ErrorClassesJsonReader was mostly for making tests 
easier to write.

This PR replaces SortedMap with Map since SortedMap is slower compared to Map.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-41705) Move generate_protos.sh to dev/

2022-12-25 Thread Ted Yu (Jira)
Ted Yu created SPARK-41705:
--

 Summary: Move generate_protos.sh to dev/ 
 Key: SPARK-41705
 URL: https://issues.apache.org/jira/browse/SPARK-41705
 Project: Spark
  Issue Type: Task
  Components: Connect
Affects Versions: 3.4.0
Reporter: Ted Yu


connector/connect/dev only contains one script. Moving generate_protos.sh to 
dev follows practice for other scripts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-41419) [K8S] Decrement PVC_COUNTER when the pod deletion happens

2022-12-06 Thread Ted Yu (Jira)
Ted Yu created SPARK-41419:
--

 Summary: [K8S] Decrement PVC_COUNTER when the pod deletion happens 
 Key: SPARK-41419
 URL: https://issues.apache.org/jira/browse/SPARK-41419
 Project: Spark
  Issue Type: Task
  Components: Kubernetes
Affects Versions: 3.4.0
Reporter: Ted Yu


commit cc55de3 introduced PVC_COUNTER to track outstanding number of PVCs.

PVC_COUNTER should only be decremented when the pod deletion happens (in 
response to error).

If the PVC isn't created successfully (where PVC_COUNTER isn't incremented) 
(possibly due to execution not reaching resource(pvc).create() call), we 
shouldn't decrement the counter.
variable `success` tracks the progress of PVC creation:

value 0 means PVC is not created.
value 1 means PVC has been created.
value 2 means PVC has been created but due to subsequent error, the pod is 
deleted.





--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-41274) Bump Kubernetes Client Version to 6.2.0

2022-11-26 Thread Ted Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu resolved SPARK-41274.

Resolution: Duplicate

This is dup of commit 02a2242a45062755bf7e20805958d5bdf1f5ed74

> Bump Kubernetes Client Version to 6.2.0
> ---
>
> Key: SPARK-41274
> URL: https://issues.apache.org/jira/browse/SPARK-41274
> Project: Spark
>  Issue Type: Improvement
>  Components: Kubernetes
>Affects Versions: 3.4.0
>Reporter: Ted Yu
>Priority: Major
>
> Bump Kubernetes Client Version to 6.2.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-41274) Bump Kubernetes Client Version to 6.2.0

2022-11-26 Thread Ted Yu (Jira)
Ted Yu created SPARK-41274:
--

 Summary: Bump Kubernetes Client Version to 6.2.0
 Key: SPARK-41274
 URL: https://issues.apache.org/jira/browse/SPARK-41274
 Project: Spark
  Issue Type: Improvement
  Components: Kubernetes
Affects Versions: 3.4.0
Reporter: Ted Yu


Bump Kubernetes Client Version to 6.2.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-41197) Upgrade Kafka version to 3.3 release

2022-11-18 Thread Ted Yu (Jira)
Ted Yu created SPARK-41197:
--

 Summary: Upgrade Kafka version to 3.3 release
 Key: SPARK-41197
 URL: https://issues.apache.org/jira/browse/SPARK-41197
 Project: Spark
  Issue Type: Improvement
  Components: Java API
Affects Versions: 3.3.1
Reporter: Ted Yu


Kafka 3.3 has been released.

This issue upgrades Kafka dependency to 3.3 release.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-40508) Treat unknown partitioning as UnknownPartitioning

2022-09-20 Thread Ted Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated SPARK-40508:
---
Description: 
When running spark application against spark 3.3, I see the following :
{code}
java.lang.IllegalArgumentException: Unsupported data source V2 partitioning 
type: CustomPartitioning
at 
org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioning$$anonfun$apply$1.applyOrElse(V2ScanPartitioning.scala:46)
at 
org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioning$$anonfun$apply$1.applyOrElse(V2ScanPartitioning.scala:34)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
{code}
The CustomPartitioning works fine with Spark 3.2.1
This PR proposes to relax the code and treat all unknown partitioning the same 
way as that for UnknownPartitioning.

  was:
When running spark application against spark 3.3, I see the following :
```
java.lang.IllegalArgumentException: Unsupported data source V2 partitioning 
type: CustomPartitioning
at 
org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioning$$anonfun$apply$1.applyOrElse(V2ScanPartitioning.scala:46)
at 
org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioning$$anonfun$apply$1.applyOrElse(V2ScanPartitioning.scala:34)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
```
The CustomPartitioning works fine with Spark 3.2.1
This PR proposes to relax the code and treat all unknown partitioning the same 
way as that for UnknownPartitioning.


> Treat unknown partitioning as UnknownPartitioning
> -
>
> Key: SPARK-40508
> URL: https://issues.apache.org/jira/browse/SPARK-40508
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.3.0
>Reporter: Ted Yu
>Priority: Major
>
> When running spark application against spark 3.3, I see the following :
> {code}
> java.lang.IllegalArgumentException: Unsupported data source V2 partitioning 
> type: CustomPartitioning
> at 
> org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioning$$anonfun$apply$1.applyOrElse(V2ScanPartitioning.scala:46)
> at 
> org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioning$$anonfun$apply$1.applyOrElse(V2ScanPartitioning.scala:34)
> at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
> {code}
> The CustomPartitioning works fine with Spark 3.2.1
> This PR proposes to relax the code and treat all unknown partitioning the 
> same way as that for UnknownPartitioning.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-40508) Treat unknown partitioning as UnknownPartitioning

2022-09-20 Thread Ted Yu (Jira)
Ted Yu created SPARK-40508:
--

 Summary: Treat unknown partitioning as UnknownPartitioning
 Key: SPARK-40508
 URL: https://issues.apache.org/jira/browse/SPARK-40508
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.3.0
Reporter: Ted Yu


When running spark application against spark 3.3, I see the following :
```
java.lang.IllegalArgumentException: Unsupported data source V2 partitioning 
type: CustomPartitioning
at 
org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioning$$anonfun$apply$1.applyOrElse(V2ScanPartitioning.scala:46)
at 
org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioning$$anonfun$apply$1.applyOrElse(V2ScanPartitioning.scala:34)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
```
The CustomPartitioning works fine with Spark 3.2.1
This PR proposes to relax the code and treat all unknown partitioning the same 
way as that for UnknownPartitioning.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-39769) Rename trait Unevaluable

2022-07-13 Thread Ted Yu (Jira)
Ted Yu created SPARK-39769:
--

 Summary: Rename trait Unevaluable
 Key: SPARK-39769
 URL: https://issues.apache.org/jira/browse/SPARK-39769
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.3.0
Reporter: Ted Yu


I came upon `trait Unevaluable` which is defined in 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala

Unevaluable is not a word.

There are `valuable`, `invaluable` but I have never seen Unevaluable.

This issue renames the trait to Unevaluatable



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-38998) Call to MetricsSystem#getServletHandlers() may take place before MetricsSystem becomes running

2022-04-22 Thread Ted Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-38998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated SPARK-38998:
---
Description: 
Sometimes the following exception is observed:
{code}
java.lang.IllegalArgumentException: requirement failed: Can only call 
getServletHandlers on a running MetricsSystem
at scala.Predef$.require(Predef.scala:281)
at 
org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:92)
at org.apache.spark.SparkContext.(SparkContext.scala:597)
at 
org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:58)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
{code}
It seems somehow the MetricsSystem was stopped in between start() and 
getServletHandlers() calls.
SparkContext should check the MetricsSystem becomes running before calling 
getServletHandlers()

  was:
Sometimes the following exception is observed:
{code}
java.lang.IllegalArgumentException: requirement failed: Can only call 
getServletHandlers on a running MetricsSystem
at scala.Predef$.require(Predef.scala:281)
at 
org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:92)
at org.apache.spark.SparkContext.(SparkContext.scala:597)
at 
org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:58)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
{code}
It seems SparkContext should wait till the MetricsSystem becomes running before 
calling getServletHandlers()


> Call to MetricsSystem#getServletHandlers() may take place before 
> MetricsSystem becomes running
> --
>
> Key: SPARK-38998
> URL: https://issues.apache.org/jira/browse/SPARK-38998
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.1.2
>Reporter: Ted Yu
>Priority: Major
>
> Sometimes the following exception is observed:
> {code}
> java.lang.IllegalArgumentException: requirement failed: Can only call 
> getServletHandlers on a running MetricsSystem
> at scala.Predef$.require(Predef.scala:281)
> at 
> org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:92)
> at org.apache.spark.SparkContext.(SparkContext.scala:597)
> at 
> org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:58)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> {code}
> It seems somehow the MetricsSystem was stopped in between start() and 
> getServletHandlers() calls.
> SparkContext should check the MetricsSystem becomes running before calling 
> getServletHandlers()



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-38998) Call to MetricsSystem#getServletHandlers() may take place before MetricsSystem becomes running

2022-04-22 Thread Ted Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-38998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated SPARK-38998:
---
Description: 
Sometimes the following exception is observed:
{code}
java.lang.IllegalArgumentException: requirement failed: Can only call 
getServletHandlers on a running MetricsSystem
at scala.Predef$.require(Predef.scala:281)
at 
org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:92)
at org.apache.spark.SparkContext.(SparkContext.scala:597)
at 
org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:58)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
{code}
It seems SparkContext should wait till the MetricsSystem becomes running before 
calling getServletHandlers()

  was:
Sometimes the following exception is observed:
```
java.lang.IllegalArgumentException: requirement failed: Can only call 
getServletHandlers on a running MetricsSystem
at scala.Predef$.require(Predef.scala:281)
at 
org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:92)
at org.apache.spark.SparkContext.(SparkContext.scala:597)
at 
org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:58)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
```
It seems SparkContext should wait till the MetricsSystem becomes running before 
calling getServletHandlers()


> Call to MetricsSystem#getServletHandlers() may take place before 
> MetricsSystem becomes running
> --
>
> Key: SPARK-38998
> URL: https://issues.apache.org/jira/browse/SPARK-38998
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.1.2
>Reporter: Ted Yu
>Priority: Major
>
> Sometimes the following exception is observed:
> {code}
> java.lang.IllegalArgumentException: requirement failed: Can only call 
> getServletHandlers on a running MetricsSystem
> at scala.Predef$.require(Predef.scala:281)
> at 
> org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:92)
> at org.apache.spark.SparkContext.(SparkContext.scala:597)
> at 
> org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:58)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> {code}
> It seems SparkContext should wait till the MetricsSystem becomes running 
> before calling getServletHandlers()



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-38998) Call to MetricsSystem#getServletHandlers() may take place before MetricsSystem becomes running

2022-04-22 Thread Ted Yu (Jira)
Ted Yu created SPARK-38998:
--

 Summary: Call to MetricsSystem#getServletHandlers() may take place 
before MetricsSystem becomes running
 Key: SPARK-38998
 URL: https://issues.apache.org/jira/browse/SPARK-38998
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.1.2
Reporter: Ted Yu


Sometimes the following exception is observed:
```
java.lang.IllegalArgumentException: requirement failed: Can only call 
getServletHandlers on a running MetricsSystem
at scala.Predef$.require(Predef.scala:281)
at 
org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:92)
at org.apache.spark.SparkContext.(SparkContext.scala:597)
at 
org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:58)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
```
It seems SparkContext should wait till the MetricsSystem becomes running before 
calling getServletHandlers()



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34532) IntervalUtils.add() may result in 'long overflow'

2021-02-24 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17290646#comment-17290646
 ] 

Ted Yu commented on SPARK-34532:


Included the test command and some more information in the description.

You should see these errors when you run the command.

> IntervalUtils.add() may result in 'long overflow'
> -
>
> Key: SPARK-34532
> URL: https://issues.apache.org/jira/browse/SPARK-34532
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.2
>Reporter: Ted Yu
>Priority: Major
>
> I noticed the following when running test suite:
> build/sbt "sql/testOnly *SQLQueryTestSuite"
> {code}
> 19:10:17.977 ERROR org.apache.spark.scheduler.TaskSetManager: Task 1 in stage 
> 6416.0 failed 1 times; aborting job
> [info] - postgreSQL/int4.sql (2 seconds, 543 milliseconds)
> 19:10:20.994 ERROR org.apache.spark.executor.Executor: Exception in task 1.0 
> in stage 6476.0 (TID 7789)
> java.lang.ArithmeticException: long overflow
> at java.lang.Math.multiplyExact(Math.java:892)
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown
>  Source)
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
>  Source)
> at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
> at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
> at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
> at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> {code}
> {code}
> 19:15:38.255 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 
> in stage 14744.0 (TID 16705)
> java.lang.ArithmeticException: long overflow
> at java.lang.Math.addExact(Math.java:809)
> at org.apache.spark.sql.types.LongExactNumeric$.plus(numerics.scala:105)
> at org.apache.spark.sql.types.LongExactNumeric$.plus(numerics.scala:104)
> at 
> org.apache.spark.sql.catalyst.expressions.Add.nullSafeEval(arithmetic.scala:268)
> at 
> org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:573)
> at 
> org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(InterpretedMutableProjection.scala:97)
> {code}
> This likely was caused by the following line:
> {code}
> val microseconds = left.microseconds + right.microseconds
> {code}
> We should check whether the addition would produce overflow before adding.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-34532) IntervalUtils.add() may result in 'long overflow'

2021-02-24 Thread Ted Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated SPARK-34532:
---
Description: 
I noticed the following when running test suite:

build/sbt "sql/testOnly *SQLQueryTestSuite"
{code}
19:10:17.977 ERROR org.apache.spark.scheduler.TaskSetManager: Task 1 in stage 
6416.0 failed 1 times; aborting job
[info] - postgreSQL/int4.sql (2 seconds, 543 milliseconds)
19:10:20.994 ERROR org.apache.spark.executor.Executor: Exception in task 1.0 in 
stage 6476.0 (TID 7789)
java.lang.ArithmeticException: long overflow
at java.lang.Math.multiplyExact(Math.java:892)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
{code}
{code}
19:15:38.255 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in 
stage 14744.0 (TID 16705)
java.lang.ArithmeticException: long overflow
at java.lang.Math.addExact(Math.java:809)
at org.apache.spark.sql.types.LongExactNumeric$.plus(numerics.scala:105)
at org.apache.spark.sql.types.LongExactNumeric$.plus(numerics.scala:104)
at 
org.apache.spark.sql.catalyst.expressions.Add.nullSafeEval(arithmetic.scala:268)
at 
org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:573)
at 
org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(InterpretedMutableProjection.scala:97)
{code}
This likely was caused by the following line:
{code}
val microseconds = left.microseconds + right.microseconds
{code}
We should check whether the addition would produce overflow before adding.

  was:
I noticed the following when running test suite:
{code}
19:15:38.255 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in 
stage 14744.0 (TID 16705)
java.lang.ArithmeticException: long overflow
at java.lang.Math.addExact(Math.java:809)
at org.apache.spark.sql.types.LongExactNumeric$.plus(numerics.scala:105)
at org.apache.spark.sql.types.LongExactNumeric$.plus(numerics.scala:104)
at 
org.apache.spark.sql.catalyst.expressions.Add.nullSafeEval(arithmetic.scala:268)
at 
org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:573)
at 
org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(InterpretedMutableProjection.scala:97)
{code}
This likely was caused by the following line:
{code}
val microseconds = left.microseconds + right.microseconds
{code}
We should check whether the addition would produce overflow before adding.


> IntervalUtils.add() may result in 'long overflow'
> -
>
> Key: SPARK-34532
> URL: https://issues.apache.org/jira/browse/SPARK-34532
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.2
>Reporter: Ted Yu
>Priority: Major
>
> I noticed the following when running test suite:
> build/sbt "sql/testOnly *SQLQueryTestSuite"
> {code}
> 19:10:17.977 ERROR org.apache.spark.scheduler.TaskSetManager: Task 1 in stage 
> 6416.0 failed 1 times; aborting job
> [info] - postgreSQL/int4.sql (2 seconds, 543 milliseconds)
> 19:10:20.994 ERROR org.apache.spark.executor.Executor: Exception in task 1.0 
> in stage 6476.0 (TID 7789)
> java.lang.ArithmeticException: long overflow
> at java.lang.Math.multiplyExact(Math.java:892)
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown
>  Source)
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
>  Source)
> at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
> at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
> at 
> 

[jira] [Created] (SPARK-34532) IntervalUtils.add() may result in 'long overflow'

2021-02-24 Thread Ted Yu (Jira)
Ted Yu created SPARK-34532:
--

 Summary: IntervalUtils.add() may result in 'long overflow'
 Key: SPARK-34532
 URL: https://issues.apache.org/jira/browse/SPARK-34532
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.0.2
Reporter: Ted Yu


I noticed the following when running test suite:
{code}
19:15:38.255 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in 
stage 14744.0 (TID 16705)
java.lang.ArithmeticException: long overflow
at java.lang.Math.addExact(Math.java:809)
at org.apache.spark.sql.types.LongExactNumeric$.plus(numerics.scala:105)
at org.apache.spark.sql.types.LongExactNumeric$.plus(numerics.scala:104)
at 
org.apache.spark.sql.catalyst.expressions.Add.nullSafeEval(arithmetic.scala:268)
at 
org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:573)
at 
org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(InterpretedMutableProjection.scala:97)
{code}
This likely was caused by the following line:
{code}
val microseconds = left.microseconds + right.microseconds
{code}
We should check whether the addition would produce overflow before adding.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34476) Duplicate referenceNames are given for ambiguousReferences

2021-02-19 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17287403#comment-17287403
 ] 

Ted Yu commented on SPARK-34476:


The basic jsonb test is here:
https://github.com/yugabyte/yugabyte-db/blob/master/java/yb-cql-4x/src/test/java/org/yb/loadtest/TestSpark3Jsonb.java

I am working on adding get_json_string() function (via Spark extension) which 
is similar to get_json_object() but expands the last jsonb field using '->>' 
instead of '->'.

> Duplicate referenceNames are given for ambiguousReferences
> --
>
> Key: SPARK-34476
> URL: https://issues.apache.org/jira/browse/SPARK-34476
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Ted Yu
>Priority: Major
>
> When running test with Spark extension that converts custom function to json 
> path expression, I saw the following in test output:
> {code}
> 2021-02-19 21:57:24,550 (Time-limited test) [INFO - 
> org.yb.loadtest.TestSpark3Jsonb.testJsonb(TestSpark3Jsonb.java:102)] plan is 
> == Physical Plan ==
> org.apache.spark.sql.AnalysisException: Reference 
> 'phone->'key'->1->'m'->2->>'b'' is ambiguous, could be: 
> mycatalog.test.person.phone->'key'->1->'m'->2->>'b', 
> mycatalog.test.person.phone->'key'->1->'m'->2->>'b'.; line 1 pos 8
> {code}
> Please note the candidates following 'could be' are the same.
> Here is the physical plan for a working query where phone is a jsonb column:
> {code}
> TakeOrderedAndProject(limit=2, orderBy=[id#6 ASC NULLS FIRST], 
> output=[id#6,address#7,key#0])
> +- *(1) Project [id#6, address#7, phone->'key'->1->'m'->2->'b'#12 AS key#0]
>+- BatchScan[id#6, address#7, phone->'key'->1->'m'->2->'b'#12] Cassandra 
> Scan: test.person
>  - Cassandra Filters: [[phone->'key'->1->'m'->2->>'b' >= ?, 100]]
>  - Requested Columns: [id,address,phone->'key'->1->'m'->2->'b']
> {code}
> The difference for the failed query is that it tries to use 
> {code}phone->'key'->1->'m'->2->>'b'{code} in the projection (which works as 
> part of filter).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-34476) Duplicate referenceNames are given for ambiguousReferences

2021-02-19 Thread Ted Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated SPARK-34476:
---
Description: 
When running test with Spark extension that converts custom function to json 
path expression, I saw the following in test output:
{code}
2021-02-19 21:57:24,550 (Time-limited test) [INFO - 
org.yb.loadtest.TestSpark3Jsonb.testJsonb(TestSpark3Jsonb.java:102)] plan is == 
Physical Plan ==
org.apache.spark.sql.AnalysisException: Reference 
'phone->'key'->1->'m'->2->>'b'' is ambiguous, could be: 
mycatalog.test.person.phone->'key'->1->'m'->2->>'b', 
mycatalog.test.person.phone->'key'->1->'m'->2->>'b'.; line 1 pos 8
{code}
Please note the candidates following 'could be' are the same.
Here is the physical plan for a working query where phone is a jsonb column:
{code}
TakeOrderedAndProject(limit=2, orderBy=[id#6 ASC NULLS FIRST], 
output=[id#6,address#7,key#0])
+- *(1) Project [id#6, address#7, phone->'key'->1->'m'->2->'b'#12 AS key#0]
   +- BatchScan[id#6, address#7, phone->'key'->1->'m'->2->'b'#12] Cassandra 
Scan: test.person
 - Cassandra Filters: [[phone->'key'->1->'m'->2->>'b' >= ?, 100]]
 - Requested Columns: [id,address,phone->'key'->1->'m'->2->'b']
{code}
The difference for the failed query is that it tries to use 
{code}phone->'key'->1->'m'->2->>'b'{code} in the projection (which works as 
part of filter).

  was:
When running test with Spark extension that converts custom function to json 
path expression, I saw the following in test output:
{code}
2021-02-19 21:57:24,550 (Time-limited test) [INFO - 
org.yb.loadtest.TestSpark3Jsonb.testJsonb(TestSpark3Jsonb.java:102)] plan is == 
Physical Plan ==
org.apache.spark.sql.AnalysisException: Reference 
'phone->'key'->1->'m'->2->>'b'' is ambiguous, could be: 
mycatalog.test.person.phone->'key'->1->'m'->2->>'b', 
mycatalog.test.person.phone->'key'->1->'m'->2->>'b'.; line 1 pos 8
{code}
Please note the candidates following 'could be' are the same.
Here is the physical plan for a working query where phone is a jsonb column:
{code}
TakeOrderedAndProject(limit=2, orderBy=[id#6 ASC NULLS FIRST], 
output=[id#6,address#7,key#0])
+- *(1) Project [id#6, address#7, phone->'key'->1->'m'->2->'b'#12 AS key#0]
   +- BatchScan[id#6, address#7, phone->'key'->1->'m'->2->'b'#12] Cassandra 
Scan: test.person
 - Cassandra Filters: [[phone->'key'->1->'m'->2->>'b' >= ?, 100]]
 - Requested Columns: [id,address,phone->'key'->1->'m'->2->'b']
{code}
The difference for the failed query is that it tries to use 
phone->'key'->1->'m'->2->>'b' in the projection (which works as part of filter).


> Duplicate referenceNames are given for ambiguousReferences
> --
>
> Key: SPARK-34476
> URL: https://issues.apache.org/jira/browse/SPARK-34476
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Ted Yu
>Priority: Major
>
> When running test with Spark extension that converts custom function to json 
> path expression, I saw the following in test output:
> {code}
> 2021-02-19 21:57:24,550 (Time-limited test) [INFO - 
> org.yb.loadtest.TestSpark3Jsonb.testJsonb(TestSpark3Jsonb.java:102)] plan is 
> == Physical Plan ==
> org.apache.spark.sql.AnalysisException: Reference 
> 'phone->'key'->1->'m'->2->>'b'' is ambiguous, could be: 
> mycatalog.test.person.phone->'key'->1->'m'->2->>'b', 
> mycatalog.test.person.phone->'key'->1->'m'->2->>'b'.; line 1 pos 8
> {code}
> Please note the candidates following 'could be' are the same.
> Here is the physical plan for a working query where phone is a jsonb column:
> {code}
> TakeOrderedAndProject(limit=2, orderBy=[id#6 ASC NULLS FIRST], 
> output=[id#6,address#7,key#0])
> +- *(1) Project [id#6, address#7, phone->'key'->1->'m'->2->'b'#12 AS key#0]
>+- BatchScan[id#6, address#7, phone->'key'->1->'m'->2->'b'#12] Cassandra 
> Scan: test.person
>  - Cassandra Filters: [[phone->'key'->1->'m'->2->>'b' >= ?, 100]]
>  - Requested Columns: [id,address,phone->'key'->1->'m'->2->'b']
> {code}
> The difference for the failed query is that it tries to use 
> {code}phone->'key'->1->'m'->2->>'b'{code} in the projection (which works as 
> part of filter).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-34476) Duplicate referenceNames are given for ambiguousReferences

2021-02-19 Thread Ted Yu (Jira)
Ted Yu created SPARK-34476:
--

 Summary: Duplicate referenceNames are given for ambiguousReferences
 Key: SPARK-34476
 URL: https://issues.apache.org/jira/browse/SPARK-34476
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.0.0
Reporter: Ted Yu


When running test with Spark extension that converts custom function to json 
path expression, I saw the following in test output:
{code}
2021-02-19 21:57:24,550 (Time-limited test) [INFO - 
org.yb.loadtest.TestSpark3Jsonb.testJsonb(TestSpark3Jsonb.java:102)] plan is == 
Physical Plan ==
org.apache.spark.sql.AnalysisException: Reference 
'phone->'key'->1->'m'->2->>'b'' is ambiguous, could be: 
mycatalog.test.person.phone->'key'->1->'m'->2->>'b', 
mycatalog.test.person.phone->'key'->1->'m'->2->>'b'.; line 1 pos 8
{code}
Please note the candidates following 'could be' are the same.
Here is the physical plan for a working query where phone is a jsonb column:
{code}
TakeOrderedAndProject(limit=2, orderBy=[id#6 ASC NULLS FIRST], 
output=[id#6,address#7,key#0])
+- *(1) Project [id#6, address#7, phone->'key'->1->'m'->2->'b'#12 AS key#0]
   +- BatchScan[id#6, address#7, phone->'key'->1->'m'->2->'b'#12] Cassandra 
Scan: test.person
 - Cassandra Filters: [[phone->'key'->1->'m'->2->>'b' >= ?, 100]]
 - Requested Columns: [id,address,phone->'key'->1->'m'->2->'b']
{code}
The difference for the failed query is that it tries to use 
phone->'key'->1->'m'->2->>'b' in the projection (which works as part of filter).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34017) Pass json column information via pruneColumns()

2021-01-05 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17259423#comment-17259423
 ] 

Ted Yu commented on SPARK-34017:


For PushDownUtils#pruneColumns, I am experimenting with the following:
{code}
  case r: SupportsPushDownRequiredColumns if 
SQLConf.get.nestedSchemaPruningEnabled =>
val JSONCapture = "get_json_object\\((.*), *(.*)\\)".r
var jsonRootFields : ArrayBuffer[RootField] = ArrayBuffer()
projects.map{ _.map{ f => f.toString match {
  case JSONCapture(column, field) =>
jsonRootFields += RootField(StructField(column, f.dataType, 
f.nullable),
  derivedFromAtt = false, prunedIfAnyChildAccessed = true)
  case _ => logDebug("else " + f)
}}}
val rootFields = SchemaPruning.identifyRootFields(projects, filters) ++ 
jsonRootFields
{code}

> Pass json column information via pruneColumns()
> ---
>
> Key: SPARK-34017
> URL: https://issues.apache.org/jira/browse/SPARK-34017
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.1
>Reporter: Ted Yu
>Priority: Major
>
> Currently PushDownUtils#pruneColumns only passes root fields to 
> SupportsPushDownRequiredColumns implementation(s).
> {code}
> 2021-01-05 19:36:07,437 (Time-limited test) [DEBUG - 
> org.apache.spark.internal.Logging.logDebug(Logging.scala:61)] nested schema 
> projection List(id#33, address#34, phone#36, get_json_object(phone#36, 
> $.code) AS get_json_object(phone, $.code)#37)
> 2021-01-05 19:36:07,438 (Time-limited test) [DEBUG - 
> org.apache.spark.internal.Logging.logDebug(Logging.scala:61)] nested schema 
> StructType(StructField(id,IntegerType,false), 
> StructField(address,StringType,true), StructField(phone,StringType,true))
> {code}
> The first line shows projections and the second line shows the pruned schema.
> We can see that get_json_object(phone#36, $.code) is filtered. This 
> expression retrieves field 'code' from phone json column.
> We should allow json column information to be passed via pruneColumns().



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-34017) Pass json column information via pruneColumns()

2021-01-05 Thread Ted Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated SPARK-34017:
---
Description: 
Currently PushDownUtils#pruneColumns only passes root fields to 
SupportsPushDownRequiredColumns implementation(s).
{code}
2021-01-05 19:36:07,437 (Time-limited test) [DEBUG - 
org.apache.spark.internal.Logging.logDebug(Logging.scala:61)] nested schema 
projection List(id#33, address#34, phone#36, get_json_object(phone#36, $.code) 
AS get_json_object(phone, $.code)#37)
2021-01-05 19:36:07,438 (Time-limited test) [DEBUG - 
org.apache.spark.internal.Logging.logDebug(Logging.scala:61)] nested schema 
StructType(StructField(id,IntegerType,false), 
StructField(address,StringType,true), StructField(phone,StringType,true))
{code}
The first line shows projections and the second line shows the pruned schema.

We can see that get_json_object(phone#36, $.code) is filtered. This expression 
retrieves field 'code' from phone json column.

We should allow json column information to be passed via pruneColumns().

  was:

Currently PushDownUtils#pruneColumns only passes root fields to 
SupportsPushDownRequiredColumns implementation(s).

2021-01-05 19:36:07,437 (Time-limited test) [DEBUG - 
org.apache.spark.internal.Logging.logDebug(Logging.scala:61)] nested schema 
projection List(id#33, address#34, phone#36, get_json_object(phone#36, $.code) 
AS get_json_object(phone, $.code)#37)
2021-01-05 19:36:07,438 (Time-limited test) [DEBUG - 
org.apache.spark.internal.Logging.logDebug(Logging.scala:61)] nested schema 
StructType(StructField(id,IntegerType,false), 
StructField(address,StringType,true), StructField(phone,StringType,true))

The first line shows projections and the second line shows the pruned schema.

We can see that get_json_object(phone#36, $.code) is filtered. This expression 
retrieves field 'code' from phone json column.

We should allow json column information to be passed via pruneColumns().


> Pass json column information via pruneColumns()
> ---
>
> Key: SPARK-34017
> URL: https://issues.apache.org/jira/browse/SPARK-34017
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.1
>Reporter: Ted Yu
>Priority: Major
>
> Currently PushDownUtils#pruneColumns only passes root fields to 
> SupportsPushDownRequiredColumns implementation(s).
> {code}
> 2021-01-05 19:36:07,437 (Time-limited test) [DEBUG - 
> org.apache.spark.internal.Logging.logDebug(Logging.scala:61)] nested schema 
> projection List(id#33, address#34, phone#36, get_json_object(phone#36, 
> $.code) AS get_json_object(phone, $.code)#37)
> 2021-01-05 19:36:07,438 (Time-limited test) [DEBUG - 
> org.apache.spark.internal.Logging.logDebug(Logging.scala:61)] nested schema 
> StructType(StructField(id,IntegerType,false), 
> StructField(address,StringType,true), StructField(phone,StringType,true))
> {code}
> The first line shows projections and the second line shows the pruned schema.
> We can see that get_json_object(phone#36, $.code) is filtered. This 
> expression retrieves field 'code' from phone json column.
> We should allow json column information to be passed via pruneColumns().



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-34017) Pass json column information via pruneColumns()

2021-01-05 Thread Ted Yu (Jira)
Ted Yu created SPARK-34017:
--

 Summary: Pass json column information via pruneColumns()
 Key: SPARK-34017
 URL: https://issues.apache.org/jira/browse/SPARK-34017
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.0.1
Reporter: Ted Yu



Currently PushDownUtils#pruneColumns only passes root fields to 
SupportsPushDownRequiredColumns implementation(s).

2021-01-05 19:36:07,437 (Time-limited test) [DEBUG - 
org.apache.spark.internal.Logging.logDebug(Logging.scala:61)] nested schema 
projection List(id#33, address#34, phone#36, get_json_object(phone#36, $.code) 
AS get_json_object(phone, $.code)#37)
2021-01-05 19:36:07,438 (Time-limited test) [DEBUG - 
org.apache.spark.internal.Logging.logDebug(Logging.scala:61)] nested schema 
StructType(StructField(id,IntegerType,false), 
StructField(address,StringType,true), StructField(phone,StringType,true))

The first line shows projections and the second line shows the pruned schema.

We can see that get_json_object(phone#36, $.code) is filtered. This expression 
retrieves field 'code' from phone json column.

We should allow json column information to be passed via pruneColumns().



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-33997) Running bin/spark-sql gives NoSuchMethodError

2021-01-04 Thread Ted Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu resolved SPARK-33997.

Resolution: Cannot Reproduce

Rebuilt Spark locally and the error was gone.

> Running bin/spark-sql gives NoSuchMethodError
> -
>
> Key: SPARK-33997
> URL: https://issues.apache.org/jira/browse/SPARK-33997
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.1
>Reporter: Ted Yu
>Priority: Major
>
> I ran 'mvn install -Phive -Phive-thriftserver -DskipTests'
> Running bin/spark-sql gives the following error:
> {code}
> 21/01/05 00:06:06 WARN NativeCodeLoader: Unable to load native-hadoop library 
> for your platform... using builtin-java classes where applicable
> Using Spark's default log4j profile: 
> org/apache/spark/log4j-defaults.properties
> Setting default log level to "WARN".
> To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
> setLogLevel(newLevel).
> Exception in thread "main" java.lang.NoSuchMethodError: 
> org.apache.spark.sql.internal.SharedState$.loadHiveConfFile$default$3()Lscala/collection/Map;
>   at 
> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:136)
>   at 
> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
>   at 
> org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:934)
> {code}
> Scala version 2.12.10



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-33997) Running bin/spark-sql gives NoSuchMethodError

2021-01-04 Thread Ted Yu (Jira)
Ted Yu created SPARK-33997:
--

 Summary: Running bin/spark-sql gives NoSuchMethodError
 Key: SPARK-33997
 URL: https://issues.apache.org/jira/browse/SPARK-33997
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.0.1
Reporter: Ted Yu


I ran 'mvn install -Phive -Phive-thriftserver -DskipTests'

Running bin/spark-sql gives the following error:
{code}
21/01/05 00:06:06 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
setLogLevel(newLevel).
Exception in thread "main" java.lang.NoSuchMethodError: 
org.apache.spark.sql.internal.SharedState$.loadHiveConfFile$default$3()Lscala/collection/Map;
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:136)
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:934)
{code}
Scala version 2.12.10



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-33915) Allow json expression to be pushable column

2021-01-02 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17257647#comment-17257647
 ] 

Ted Yu commented on SPARK-33915:


Here is sample code for capturing the column and fields in downstream 
PredicatePushDown.scala
{code}
  private val JSONCapture = "`GetJsonObject\\((.*),(.*)\\)`".r
  private def transformGetJsonObject(p: Predicate): Predicate = {
val eq = p.asInstanceOf[sources.EqualTo]
eq.attribute match {
  case JSONCapture(column,field) =>
val colName = column.toString.split("#")(0)
val names = field.toString.split("\\.").foldLeft(List[String]()){(z, n) 
=> z :+ "->'"+n+"'" }
sources.EqualTo(colName + names.slice(1, names.size).mkString(""), 
eq.value).asInstanceOf[Predicate]
  case _ => sources.EqualTo("foo", "bar").asInstanceOf[Predicate]
}
  }
{code}

> Allow json expression to be pushable column
> ---
>
> Key: SPARK-33915
> URL: https://issues.apache.org/jira/browse/SPARK-33915
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.0.1
>Reporter: Ted Yu
>Assignee: Apache Spark
>Priority: Major
>
> Currently PushableColumnBase provides no support for json / jsonb expression.
> Example of json expression:
> {code}
> get_json_object(phone, '$.code') = '1200'
> {code}
> If non-string literal is part of the expression, the presence of cast() would 
> complicate the situation.
> Implication is that implementation of SupportsPushDownFilters doesn't have a 
> chance to perform pushdown even if third party DB engine supports json 
> expression pushdown.
> This issue is for discussion and implementation of Spark core changes which 
> would allow json expression to be recognized as pushable column.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Issue Comment Deleted] (SPARK-33915) Allow json expression to be pushable column

2021-01-02 Thread Ted Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated SPARK-33915:
---
Comment: was deleted

(was: Opened https://github.com/apache/spark/pull/30984)

> Allow json expression to be pushable column
> ---
>
> Key: SPARK-33915
> URL: https://issues.apache.org/jira/browse/SPARK-33915
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.0.1
>Reporter: Ted Yu
>Assignee: Apache Spark
>Priority: Major
>
> Currently PushableColumnBase provides no support for json / jsonb expression.
> Example of json expression:
> {code}
> get_json_object(phone, '$.code') = '1200'
> {code}
> If non-string literal is part of the expression, the presence of cast() would 
> complicate the situation.
> Implication is that implementation of SupportsPushDownFilters doesn't have a 
> chance to perform pushdown even if third party DB engine supports json 
> expression pushdown.
> This issue is for discussion and implementation of Spark core changes which 
> would allow json expression to be recognized as pushable column.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-33953) readImages test fails due to NoClassDefFoundError for ImageTypeSpecifier

2020-12-31 Thread Ted Yu (Jira)
Ted Yu created SPARK-33953:
--

 Summary: readImages test fails due to NoClassDefFoundError for 
ImageTypeSpecifier
 Key: SPARK-33953
 URL: https://issues.apache.org/jira/browse/SPARK-33953
 Project: Spark
  Issue Type: Test
  Components: ML
Affects Versions: 3.0.1
Reporter: Ted Yu


>From https://github.com/apache/spark/pull/30984/checks?check_run_id=1630709203 
>:
```
[info]   org.apache.spark.SparkException: Job aborted due to stage failure: 
Task 1 in stage 21.0 failed 1 times, most recent failure: Lost task 1.0 in 
stage 21.0 (TID 20) (fv-az212-589.internal.cloudapp.net executor driver): 
java.lang.NoClassDefFoundError: Could not initialize class 
javax.imageio.ImageTypeSpecifier
[info]  at 
com.sun.imageio.plugins.png.PNGImageReader.getImageTypes(PNGImageReader.java:1531)
[info]  at 
com.sun.imageio.plugins.png.PNGImageReader.readImage(PNGImageReader.java:1318)
```
It seems some dependency is missing.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-33915) Allow json expression to be pushable column

2020-12-31 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17257025#comment-17257025
 ] 

Ted Yu commented on SPARK-33915:


Opened https://github.com/apache/spark/pull/30984

> Allow json expression to be pushable column
> ---
>
> Key: SPARK-33915
> URL: https://issues.apache.org/jira/browse/SPARK-33915
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.0.1
>Reporter: Ted Yu
>Priority: Major
>
> Currently PushableColumnBase provides no support for json / jsonb expression.
> Example of json expression:
> {code}
> get_json_object(phone, '$.code') = '1200'
> {code}
> If non-string literal is part of the expression, the presence of cast() would 
> complicate the situation.
> Implication is that implementation of SupportsPushDownFilters doesn't have a 
> chance to perform pushdown even if third party DB engine supports json 
> expression pushdown.
> This issue is for discussion and implementation of Spark core changes which 
> would allow json expression to be recognized as pushable column.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-33915) Allow json expression to be pushable column

2020-12-31 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17255389#comment-17255389
 ] 

Ted Yu edited comment on SPARK-33915 at 12/31/20, 3:16 PM:
---

Here is the plan prior to predicate pushdown:
{code}
2020-12-26 03:28:59,926 (Time-limited test) [DEBUG - 
org.apache.spark.internal.Logging.logDebug(Logging.scala:61)] Adaptive 
execution enabled for plan: Sort [id#34 ASC NULLS FIRST], true, 0
+- Project [id#34, address#35, phone#37, get_json_object(phone#37, $.code) AS 
phone#33]
   +- Filter (get_json_object(phone#37, $.code) = 1200)
  +- BatchScan[id#34, address#35, phone#37] Cassandra Scan: test.person
 - Cassandra Filters: []
 - Requested Columns: [id,address,phone]
{code}
Here is the plan with pushdown:
{code}
2020-12-28 01:40:08,150 (Time-limited test) [DEBUG - 
org.apache.spark.internal.Logging.logDebug(Logging.scala:61)] Adaptive 
execution enabled for plan: Sort [id#34 ASC NULLS FIRST], true, 0
+- Project [id#34, address#35, phone#37, get_json_object(phone#37, $.code) 
AS phone#33]
   +- BatchScan[id#34, address#35, phone#37] Cassandra Scan: test.person
 - Cassandra Filters: [[phone->'code' = ?, 1200]]
 - Requested Columns: [id,address,phone]

{code}


was (Author: yuzhih...@gmail.com):
Here is the plan prior to predicate pushdown:
{code}
2020-12-26 03:28:59,926 (Time-limited test) [DEBUG - 
org.apache.spark.internal.Logging.logDebug(Logging.scala:61)] Adaptive 
execution enabled for plan: Sort [id#34 ASC NULLS FIRST], true, 0
+- Project [id#34, address#35, phone#37, get_json_object(phone#37, $.code) AS 
phone#33]
   +- Filter (get_json_object(phone#37, $.phone) = 1200)
  +- BatchScan[id#34, address#35, phone#37] Cassandra Scan: test.person
 - Cassandra Filters: []
 - Requested Columns: [id,address,phone]
{code}
Here is the plan with pushdown:
{code}
2020-12-28 01:40:08,150 (Time-limited test) [DEBUG - 
org.apache.spark.internal.Logging.logDebug(Logging.scala:61)] Adaptive 
execution enabled for plan: Sort [id#34 ASC NULLS FIRST], true, 0
+- Project [id#34, address#35, phone#37, get_json_object(phone#37, $.code) 
AS phone#33]
   +- BatchScan[id#34, address#35, phone#37] Cassandra Scan: test.person
 - Cassandra Filters: [[phone->'phone' = ?, 1200]]
 - Requested Columns: [id,address,phone]

{code}

> Allow json expression to be pushable column
> ---
>
> Key: SPARK-33915
> URL: https://issues.apache.org/jira/browse/SPARK-33915
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.0.1
>Reporter: Ted Yu
>Priority: Major
>
> Currently PushableColumnBase provides no support for json / jsonb expression.
> Example of json expression:
> {code}
> get_json_object(phone, '$.code') = '1200'
> {code}
> If non-string literal is part of the expression, the presence of cast() would 
> complicate the situation.
> Implication is that implementation of SupportsPushDownFilters doesn't have a 
> chance to perform pushdown even if third party DB engine supports json 
> expression pushdown.
> This issue is for discussion and implementation of Spark core changes which 
> would allow json expression to be recognized as pushable column.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-33915) Allow json expression to be pushable column

2020-12-29 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17255389#comment-17255389
 ] 

Ted Yu edited comment on SPARK-33915 at 12/29/20, 6:51 PM:
---

Here is the plan prior to predicate pushdown:
{code}
2020-12-26 03:28:59,926 (Time-limited test) [DEBUG - 
org.apache.spark.internal.Logging.logDebug(Logging.scala:61)] Adaptive 
execution enabled for plan: Sort [id#34 ASC NULLS FIRST], true, 0
+- Project [id#34, address#35, phone#37, get_json_object(phone#37, $.code) AS 
phone#33]
   +- Filter (get_json_object(phone#37, $.phone) = 1200)
  +- BatchScan[id#34, address#35, phone#37] Cassandra Scan: test.person
 - Cassandra Filters: []
 - Requested Columns: [id,address,phone]
{code}
Here is the plan with pushdown:
{code}
2020-12-28 01:40:08,150 (Time-limited test) [DEBUG - 
org.apache.spark.internal.Logging.logDebug(Logging.scala:61)] Adaptive 
execution enabled for plan: Sort [id#34 ASC NULLS FIRST], true, 0
+- Project [id#34, address#35, phone#37, get_json_object(phone#37, $.code) 
AS phone#33]
   +- BatchScan[id#34, address#35, phone#37] Cassandra Scan: test.person
 - Cassandra Filters: [[phone->'phone' = ?, 1200]]
 - Requested Columns: [id,address,phone]

{code}


was (Author: yuzhih...@gmail.com):
Here is the plan prior to predicate pushdown:
{code}
2020-12-26 03:28:59,926 (Time-limited test) [DEBUG - 
org.apache.spark.internal.Logging.logDebug(Logging.scala:61)] Adaptive 
execution enabled for plan: Sort [id#34 ASC NULLS FIRST], true, 0
+- Project [id#34, address#35, phone#37, get_json_object(phone#37, $.code) AS 
phone#33]
   +- Filter (get_json_object(phone#37, $.phone) = 1200)
  +- BatchScan[id#34, address#35, phone#37] Cassandra Scan: test.person
 - Cassandra Filters: []
 - Requested Columns: [id,address,phone]
{code}
Here is the plan with pushdown:
{code}
2020-12-28 01:40:08,150 (Time-limited test) [DEBUG - 
org.apache.spark.internal.Logging.logDebug(Logging.scala:61)] Adaptive 
execution enabled for plan: Sort [id#34 ASC NULLS FIRST], true, 0
+- Project [id#34, address#35, phone#37, get_json_object(phone#37, $.code) AS 
phone#33]
   +- BatchScan[id#34, address#35, phone#37] Cassandra Scan: test.person
 - Cassandra Filters: [["`GetJsonObject(phone#37,$.phone)`" = ?, 1200]]
 - Requested Columns: [id,address,phone]
{code}

> Allow json expression to be pushable column
> ---
>
> Key: SPARK-33915
> URL: https://issues.apache.org/jira/browse/SPARK-33915
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.0.1
>Reporter: Ted Yu
>Priority: Major
>
> Currently PushableColumnBase provides no support for json / jsonb expression.
> Example of json expression:
> {code}
> get_json_object(phone, '$.code') = '1200'
> {code}
> If non-string literal is part of the expression, the presence of cast() would 
> complicate the situation.
> Implication is that implementation of SupportsPushDownFilters doesn't have a 
> chance to perform pushdown even if third party DB engine supports json 
> expression pushdown.
> This issue is for discussion and implementation of Spark core changes which 
> would allow json expression to be recognized as pushable column.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-33915) Allow json expression to be pushable column

2020-12-28 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17255631#comment-17255631
 ] 

Ted Yu commented on SPARK-33915:


[~codingcat] [~XuanYuan][~viirya][~Alex Herman]
Can you provide your comment ?

Thanks

> Allow json expression to be pushable column
> ---
>
> Key: SPARK-33915
> URL: https://issues.apache.org/jira/browse/SPARK-33915
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.0.1
>Reporter: Ted Yu
>Priority: Major
>
> Currently PushableColumnBase provides no support for json / jsonb expression.
> Example of json expression:
> {code}
> get_json_object(phone, '$.code') = '1200'
> {code}
> If non-string literal is part of the expression, the presence of cast() would 
> complicate the situation.
> Implication is that implementation of SupportsPushDownFilters doesn't have a 
> chance to perform pushdown even if third party DB engine supports json 
> expression pushdown.
> This issue is for discussion and implementation of Spark core changes which 
> would allow json expression to be recognized as pushable column.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-33915) Allow json expression to be pushable column

2020-12-27 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17255389#comment-17255389
 ] 

Ted Yu commented on SPARK-33915:


Here is the plan prior to predicate pushdown:
{code}
2020-12-26 03:28:59,926 (Time-limited test) [DEBUG - 
org.apache.spark.internal.Logging.logDebug(Logging.scala:61)] Adaptive 
execution enabled for plan: Sort [id#34 ASC NULLS FIRST], true, 0
+- Project [id#34, address#35, phone#37, get_json_object(phone#37, $.code) AS 
phone#33]
   +- Filter (get_json_object(phone#37, $.phone) = 1200)
  +- BatchScan[id#34, address#35, phone#37] Cassandra Scan: test.person
 - Cassandra Filters: []
 - Requested Columns: [id,address,phone]
{code}
Here is the plan with pushdown:
{code}
2020-12-28 01:40:08,150 (Time-limited test) [DEBUG - 
org.apache.spark.internal.Logging.logDebug(Logging.scala:61)] Adaptive 
execution enabled for plan: Sort [id#34 ASC NULLS FIRST], true, 0
+- Project [id#34, address#35, phone#37, get_json_object(phone#37, $.code) AS 
phone#33]
   +- BatchScan[id#34, address#35, phone#37] Cassandra Scan: test.person
 - Cassandra Filters: [["`GetJsonObject(phone#37,$.phone)`" = ?, 1200]]
 - Requested Columns: [id,address,phone]
{code}

> Allow json expression to be pushable column
> ---
>
> Key: SPARK-33915
> URL: https://issues.apache.org/jira/browse/SPARK-33915
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.0.1
>Reporter: Ted Yu
>Priority: Major
>
> Currently PushableColumnBase provides no support for json / jsonb expression.
> Example of json expression:
> {code}
> get_json_object(phone, '$.code') = '1200'
> {code}
> If non-string literal is part of the expression, the presence of cast() would 
> complicate the situation.
> Implication is that implementation of SupportsPushDownFilters doesn't have a 
> chance to perform pushdown even if third party DB engine supports json 
> expression pushdown.
> This issue is for discussion and implementation of Spark core changes which 
> would allow json expression to be recognized as pushable column.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-33915) Allow json expression to be pushable column

2020-12-25 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17254944#comment-17254944
 ] 

Ted Yu edited comment on SPARK-33915 at 12/26/20, 4:14 AM:
---

I have experimented with two patches which allow json expression to be pushable 
column (without the presence of cast).

The first involves duplicating GetJsonObject as GetJsonString where eval() 
method returns UTF8String. FunctionRegistry.scala and functions.scala would 
have corresponding change.
In PushableColumnBase class, new case for GetJsonString is added.

Since code duplication is undesirable and case class cannot be directly 
subclassed, there is more work to be done in this direction.

The second is simpler. The return type of GetJsonObject#eval is changed to 
UTF8String.
I have run through catalyst / sql core unit tests which passed.
In PushableColumnBase class, new case for GetJsonObject is added.

In test output, I observed the following (for second patch, output for first 
patch is similar):
{code}
Post-Scan Filters: (get_json_object(phone#37, $.code) = 1200)
{code}
Comment on the two approaches (and other approach) is welcome.

Below is snippet for second approach.
{code}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index c22b68890a..8004ddd735 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -139,7 +139,7 @@ case class GetJsonObject(json: Expression, path: Expression)

   @transient private lazy val parsedPath = 
parsePath(path.eval().asInstanceOf[UTF8String])

-  override def eval(input: InternalRow): Any = {
+  override def eval(input: InternalRow): UTF8String = {
 val jsonStr = json.eval(input).asInstanceOf[UTF8String]
 if (jsonStr == null) {
   return null
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index e4f001d61a..1cdc2642ba 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -723,6 +725,12 @@ abstract class PushableColumnBase {
 }
   case s: GetStructField if nestedPredicatePushdownEnabled =>
 helper(s.child).map(_ :+ s.childSchema(s.ordinal).name)
+  case GetJsonObject(col, field) =>
+Some(Seq("GetJsonObject(" + col + "," + field + ")"))
   case _ => None
 }
 helper(e).map(_.quoted)
{code}


was (Author: yuzhih...@gmail.com):
I have experimented with two patches which allow json expression to be pushable 
column (without the presence of cast).

The first involves duplicating GetJsonObject as GetJsonString where eval() 
method returns UTF8String. FunctionRegistry.scala and functions.scala would 
have corresponding change.
In PushableColumnBase class, new case for GetJsonString is added.

Since code duplication is undesirable and case class cannot be directly 
subclassed, there is more work to be done in this direction.

The second is simpler. The return type of GetJsonObject#eval is changed to 
UTF8String.
I have run through catalyst / sql core unit tests which passed.
In PushableColumnBase class, new case for GetJsonObject is added.

In test output, I observed the following (for second patch, output for first 
patch is similar):
{code}
Post-Scan Filters: (get_json_object(phone#37, $.phone) = 1200)
{code}
Comment on the two approaches (and other approach) is welcome.

Below is snippet for second approach.
{code}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index c22b68890a..8004ddd735 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -139,7 +139,7 @@ case class GetJsonObject(json: Expression, path: Expression)

   @transient private lazy val parsedPath = 
parsePath(path.eval().asInstanceOf[UTF8String])

-  override def eval(input: InternalRow): Any = {
+  override def eval(input: InternalRow): UTF8String = {
 val jsonStr = json.eval(input).asInstanceOf[UTF8String]
 if (jsonStr == null) {
   return null
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 

[jira] [Comment Edited] (SPARK-33915) Allow json expression to be pushable column

2020-12-25 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17254944#comment-17254944
 ] 

Ted Yu edited comment on SPARK-33915 at 12/26/20, 4:14 AM:
---

I have experimented with two patches which allow json expression to be pushable 
column (without the presence of cast).

The first involves duplicating GetJsonObject as GetJsonString where eval() 
method returns UTF8String. FunctionRegistry.scala and functions.scala would 
have corresponding change.
In PushableColumnBase class, new case for GetJsonString is added.

Since code duplication is undesirable and case class cannot be directly 
subclassed, there is more work to be done in this direction.

The second is simpler. The return type of GetJsonObject#eval is changed to 
UTF8String.
I have run through catalyst / sql core unit tests which passed.
In PushableColumnBase class, new case for GetJsonObject is added.

In test output, I observed the following (for second patch, output for first 
patch is similar):
{code}
Post-Scan Filters: (get_json_object(phone#37, $.phone) = 1200)
{code}
Comment on the two approaches (and other approach) is welcome.

Below is snippet for second approach.
{code}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index c22b68890a..8004ddd735 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -139,7 +139,7 @@ case class GetJsonObject(json: Expression, path: Expression)

   @transient private lazy val parsedPath = 
parsePath(path.eval().asInstanceOf[UTF8String])

-  override def eval(input: InternalRow): Any = {
+  override def eval(input: InternalRow): UTF8String = {
 val jsonStr = json.eval(input).asInstanceOf[UTF8String]
 if (jsonStr == null) {
   return null
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index e4f001d61a..1cdc2642ba 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -723,6 +725,12 @@ abstract class PushableColumnBase {
 }
   case s: GetStructField if nestedPredicatePushdownEnabled =>
 helper(s.child).map(_ :+ s.childSchema(s.ordinal).name)
+  case GetJsonObject(col, field) =>
+Some(Seq("GetJsonObject(" + col + "," + field + ")"))
   case _ => None
 }
 helper(e).map(_.quoted)
{code}


was (Author: yuzhih...@gmail.com):
I have experimented with two patches which allow json expression to be pushable 
column (without the presence of cast).

The first involves duplicating GetJsonObject as GetJsonString where eval() 
method returns UTF8String. FunctionRegistry.scala and functions.scala would 
have corresponding change.
In PushableColumnBase class, new case for GetJsonString is added.

Since code duplication is undesirable and case class cannot be directly 
subclassed, there is more work to be done in this direction.

The second is simpler. The return type of GetJsonObject#eval is changed to 
UTF8String.
I have run through catalyst / sql core unit tests which passed.
In PushableColumnBase class, new case for GetJsonObject is added.

In test output, I observed the following (for second patch, output for first 
patch is similar):

Post-Scan Filters: (get_json_object(phone#37, $.phone) = 1200)

Comment on the two approaches (and other approach) is welcome.

Below is snippet for second approach.
{code}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index c22b68890a..8004ddd735 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -139,7 +139,7 @@ case class GetJsonObject(json: Expression, path: Expression)

   @transient private lazy val parsedPath = 
parsePath(path.eval().asInstanceOf[UTF8String])

-  override def eval(input: InternalRow): Any = {
+  override def eval(input: InternalRow): UTF8String = {
 val jsonStr = json.eval(input).asInstanceOf[UTF8String]
 if (jsonStr == null) {
   return null
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 

[jira] [Commented] (SPARK-33915) Allow json expression to be pushable column

2020-12-25 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17254944#comment-17254944
 ] 

Ted Yu commented on SPARK-33915:


I have experimented with two patches which allow json expression to be pushable 
column (without the presence of cast).

The first involves duplicating GetJsonObject as GetJsonString where eval() 
method returns UTF8String. FunctionRegistry.scala and functions.scala would 
have corresponding change.
In PushableColumnBase class, new case for GetJsonString is added.

Since code duplication is undesirable and case class cannot be directly 
subclassed, there is more work to be done in this direction.

The second is simpler. The return type of GetJsonObject#eval is changed to 
UTF8String.
I have run through catalyst / sql core unit tests which passed.
In PushableColumnBase class, new case for GetJsonObject is added.

In test output, I observed the following (for second patch, output for first 
patch is similar):

Post-Scan Filters: (get_json_object(phone#37, $.phone) = 1200)

Comment on the two approaches (and other approach) is welcome.

Below is snippet for second approach.
{code}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index c22b68890a..8004ddd735 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -139,7 +139,7 @@ case class GetJsonObject(json: Expression, path: Expression)

   @transient private lazy val parsedPath = 
parsePath(path.eval().asInstanceOf[UTF8String])

-  override def eval(input: InternalRow): Any = {
+  override def eval(input: InternalRow): UTF8String = {
 val jsonStr = json.eval(input).asInstanceOf[UTF8String]
 if (jsonStr == null) {
   return null
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index e4f001d61a..1cdc2642ba 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -723,6 +725,12 @@ abstract class PushableColumnBase {
 }
   case s: GetStructField if nestedPredicatePushdownEnabled =>
 helper(s.child).map(_ :+ s.childSchema(s.ordinal).name)
+  case GetJsonObject(col, field) =>
+Some(Seq("GetJsonObject(" + col + "," + field + ")"))
   case _ => None
 }
 helper(e).map(_.quoted)
{code}

> Allow json expression to be pushable column
> ---
>
> Key: SPARK-33915
> URL: https://issues.apache.org/jira/browse/SPARK-33915
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.0.1
>Reporter: Ted Yu
>Priority: Major
>
> Currently PushableColumnBase provides no support for json / jsonb expression.
> Example of json expression:
> {code}
> get_json_object(phone, '$.code') = '1200'
> {code}
> If non-string literal is part of the expression, the presence of cast() would 
> complicate the situation.
> Implication is that implementation of SupportsPushDownFilters doesn't have a 
> chance to perform pushdown even if third party DB engine supports json 
> expression pushdown.
> This issue is for discussion and implementation of Spark core changes which 
> would allow json expression to be recognized as pushable column.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-33915) Allow json expression to be pushable column

2020-12-25 Thread Ted Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated SPARK-33915:
---
Description: 
Currently PushableColumnBase provides no support for json / jsonb expression.

Example of json expression:
{code}
get_json_object(phone, '$.code') = '1200'
{code}
If non-string literal is part of the expression, the presence of cast() would 
complicate the situation.

Implication is that implementation of SupportsPushDownFilters doesn't have a 
chance to perform pushdown even if third party DB engine supports json 
expression pushdown.

This issue is for discussion and implementation of Spark core changes which 
would allow json expression to be recognized as pushable column.

  was:
Currently PushableColumnBase provides no support for json / jsonb expression.

Example of json expression:
get_json_object(phone, '$.code') = '1200'

If non-string literal is part of the expression, the presence of cast() would 
complicate the situation.

Implication is that implementation of SupportsPushDownFilters doesn't have a 
chance to perform pushdown even if third party DB engine supports json 
expression pushdown.

This issue is for discussion and implementation of Spark core changes which 
would allow json expression to be recognized as pushable column.


> Allow json expression to be pushable column
> ---
>
> Key: SPARK-33915
> URL: https://issues.apache.org/jira/browse/SPARK-33915
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.0.1
>Reporter: Ted Yu
>Priority: Major
>
> Currently PushableColumnBase provides no support for json / jsonb expression.
> Example of json expression:
> {code}
> get_json_object(phone, '$.code') = '1200'
> {code}
> If non-string literal is part of the expression, the presence of cast() would 
> complicate the situation.
> Implication is that implementation of SupportsPushDownFilters doesn't have a 
> chance to perform pushdown even if third party DB engine supports json 
> expression pushdown.
> This issue is for discussion and implementation of Spark core changes which 
> would allow json expression to be recognized as pushable column.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-33915) Allow json expression to be pushable column

2020-12-25 Thread Ted Yu (Jira)
Ted Yu created SPARK-33915:
--

 Summary: Allow json expression to be pushable column
 Key: SPARK-33915
 URL: https://issues.apache.org/jira/browse/SPARK-33915
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 3.0.1
Reporter: Ted Yu


Currently PushableColumnBase provides no support for json / jsonb expression.

Example of json expression:
get_json_object(phone, '$.code') = '1200'

If non-string literal is part of the expression, the presence of cast() would 
complicate the situation.

Implication is that implementation of SupportsPushDownFilters doesn't have a 
chance to perform pushdown even if third party DB engine supports json 
expression pushdown.

This issue is for discussion and implementation of Spark core changes which 
would allow json expression to be recognized as pushable column.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (HBASE-20226) Performance Improvement Taking Large Snapshots In Remote Filesystems

2020-07-27 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/HBASE-20226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17166082#comment-17166082
 ] 

Ted Yu commented on HBASE-20226:


{code}
+if (v1Regions.size() > 0 || v2Regions.size() > 0) {
{code}
It seems the thread pool is needed when v1Regions.size()+v2Regions.size() > 1.

There are also a few findbugs warnings to be addressed.

> Performance Improvement Taking Large Snapshots In Remote Filesystems
> 
>
> Key: HBASE-20226
> URL: https://issues.apache.org/jira/browse/HBASE-20226
> Project: HBase
>  Issue Type: Improvement
>  Components: snapshots
>Affects Versions: 1.4.0
> Environment: HBase 1.4.0 running on an AWS EMR cluster with the 
> hbase.rootdir set to point to a folder in S3 
>Reporter: Saad Mufti
>Priority: Minor
> Attachments: HBASE-20226..01.patch
>
>
> When taking a snapshot of any table, one of the last steps is to delete the 
> region manifests, which have already been rolled up into a larger overall 
> manifest and thus have redundant information.
> This proposal is to do the deletion in a thread pool bounded by 
> hbase.snapshot.thread.pool.max . For large tables with a lot of regions, the 
> current single threaded deletion is taking longer than all the rest of the 
> snapshot tasks when the Hbase data and the snapshot folder are both in a 
> remote filesystem like S3.
> I have a patch for this proposal almost ready and will submit it tomorrow for 
> feedback, although I haven't had a chance to write any tests yet.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (HBASE-24137) The max merge count of metafixer should be configurable in MetaFixer

2020-04-08 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/HBASE-24137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17078854#comment-17078854
 ] 

Ted Yu commented on HBASE-24137:


Please find people who have touched this class to review.
I may not have time to review.

> The max merge count of metafixer should be configurable in MetaFixer
> 
>
> Key: HBASE-24137
> URL: https://issues.apache.org/jira/browse/HBASE-24137
> Project: HBase
>  Issue Type: Improvement
>Affects Versions: 3.0.0
>Reporter: Yu Wang
>Priority: Minor
> Fix For: 3.0.0
>
> Attachments: 24137_master_1.patch
>
>
> The max merge count of metafixer should be configurable in MetaFixer



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9504) Memory leak in KafkaMetrics registered to MBean

2020-02-07 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032739#comment-17032739
 ] 

Ted Yu commented on KAFKA-9504:
---

It seems the closing of metrics is not enough in terms of preventing memory 
leak:
{code}
Utils.closeQuietly(kafkaConsumerMetrics, "kafka consumer metrics", 
firstException);
Utils.closeQuietly(metrics, "consumer metrics", firstException);
{code}

> Memory leak in KafkaMetrics registered to MBean
> ---
>
> Key: KAFKA-9504
> URL: https://issues.apache.org/jira/browse/KAFKA-9504
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.4.0
>Reporter: Andreas Holmén
>Priority: Major
>
> After close() called on a KafkaConsumer some registered MBeans are not 
> unregistered causing leak.
>  
>  
> {code:java}
> import static 
> org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
> import java.lang.management.ManagementFactory;
> import java.util.HashMap;
> import java.util.Map;
> import javax.management.MBeanServer;
> import org.apache.kafka.clients.consumer.KafkaConsumer;
> import org.apache.kafka.common.serialization.ByteArrayDeserializer;
> public class Leaker {
>  private static String bootstrapServers = "hostname:9092";
>  
>  public static void main(String[] args) throws InterruptedException {
>   MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
>   Map props = new HashMap<>();
>   props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
>  
>   int beans = mBeanServer.getMBeanCount();
>   for (int i = 0; i < 100; i++) {
>KafkaConsumer consumer = new KafkaConsumer<>(props, new 
> ByteArrayDeserializer(), new ByteArrayDeserializer());
>consumer.close();
>   }
>   int newBeans = mBeanServer.getMBeanCount();
>   System.out.println("\nbeans delta: " + (newBeans - beans));
>  }
> }
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9471) Throw exception for DEAD StreamThread.State

2020-01-24 Thread Ted Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu resolved KAFKA-9471.
---
Resolution: Duplicate

> Throw exception for DEAD StreamThread.State
> ---
>
> Key: KAFKA-9471
> URL: https://issues.apache.org/jira/browse/KAFKA-9471
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ted Yu
>Assignee: Ted Yu
>Priority: Minor
>
> In StreamThreadStateStoreProvider we have:
> {code}
> if (streamThread.state() == StreamThread.State.DEAD) {
> return Collections.emptyList();
> {code}
> If user cannot retry anymore, we should throw exception which is handled in 
> the else block.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9471) Throw exception for DEAD StreamThread.State

2020-01-24 Thread Ted Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu resolved KAFKA-9471.
---
Resolution: Duplicate

> Throw exception for DEAD StreamThread.State
> ---
>
> Key: KAFKA-9471
> URL: https://issues.apache.org/jira/browse/KAFKA-9471
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ted Yu
>Assignee: Ted Yu
>Priority: Minor
>
> In StreamThreadStateStoreProvider we have:
> {code}
> if (streamThread.state() == StreamThread.State.DEAD) {
> return Collections.emptyList();
> {code}
> If user cannot retry anymore, we should throw exception which is handled in 
> the else block.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9471) Throw exception for DEAD StreamThread.State

2020-01-24 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17022970#comment-17022970
 ] 

Ted Yu commented on KAFKA-9471:
---

[~vitojeng]
Please let me know if I should proceed with this or, do you plan to include the 
exception throwing in KIP-216 ?

> Throw exception for DEAD StreamThread.State
> ---
>
> Key: KAFKA-9471
> URL: https://issues.apache.org/jira/browse/KAFKA-9471
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ted Yu
>Assignee: Ted Yu
>Priority: Minor
>
> In StreamThreadStateStoreProvider we have:
> {code}
> if (streamThread.state() == StreamThread.State.DEAD) {
> return Collections.emptyList();
> {code}
> If user cannot retry anymore, we should throw exception which is handled in 
> the else block.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9471) Throw exception for DEAD StreamThread.State

2020-01-23 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17022595#comment-17022595
 ] 

Ted Yu commented on KAFKA-9471:
---

[~vitojeng]
Looks like the following query depends on the empty collection for DEAD state:
{code}
@Test
public void shouldAllowToQueryAfterThreadDied() throws Exception {
{code}
It fails when exception is thrown.
What do you think ?

> Throw exception for DEAD StreamThread.State
> ---
>
> Key: KAFKA-9471
> URL: https://issues.apache.org/jira/browse/KAFKA-9471
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ted Yu
>Assignee: Ted Yu
>Priority: Minor
>
> In StreamThreadStateStoreProvider we have:
> {code}
> if (streamThread.state() == StreamThread.State.DEAD) {
> return Collections.emptyList();
> {code}
> If user cannot retry anymore, we should throw exception which is handled in 
> the else block.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (KAFKA-9471) Throw exception for DEAD StreamThread.State

2020-01-23 Thread Ted Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-9471:
--
Comment: was deleted

(was: I will send out a PR soon alone the above line of thinking.)

> Throw exception for DEAD StreamThread.State
> ---
>
> Key: KAFKA-9471
> URL: https://issues.apache.org/jira/browse/KAFKA-9471
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ted Yu
>Assignee: Ted Yu
>Priority: Minor
>
> In StreamThreadStateStoreProvider we have:
> {code}
> if (streamThread.state() == StreamThread.State.DEAD) {
> return Collections.emptyList();
> {code}
> If user cannot retry anymore, we should throw exception which is handled in 
> the else block.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9471) Throw exception for DEAD StreamThread.State

2020-01-23 Thread Ted Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-9471:
--
Summary: Throw exception for DEAD StreamThread.State  (was: Return empty 
collection for PENDING_SHUTDOWN)

> Throw exception for DEAD StreamThread.State
> ---
>
> Key: KAFKA-9471
> URL: https://issues.apache.org/jira/browse/KAFKA-9471
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ted Yu
>Assignee: Ted Yu
>Priority: Minor
>
> In StreamThreadStateStoreProvider we have:
> {code}
> if (streamThread.state() == StreamThread.State.DEAD) {
> return Collections.emptyList();
> {code}
> If user cannot retry anymore, we should throw exception which is handled in 
> the else block.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9471) Return empty collection for PENDING_SHUTDOWN

2020-01-23 Thread Ted Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-9471:
--
Description: 
In StreamThreadStateStoreProvider we have:
{code}
if (streamThread.state() == StreamThread.State.DEAD) {
return Collections.emptyList();
{code}
If user cannot retry anymore, we should throw exception which is handled in the 
else block.

  was:
In StreamThreadStateStoreProvider we have:
{code}
if (streamThread.state() == StreamThread.State.DEAD) {
return Collections.emptyList();
{code}
PENDING_SHUTDOWN is precursor to DEAD state.
PENDING_SHUTDOWN should be treated the same way as DEAD.

This makes more sense than current behavior of throwing exception for 
PENDING_SHUTDOWN.


> Return empty collection for PENDING_SHUTDOWN
> 
>
> Key: KAFKA-9471
> URL: https://issues.apache.org/jira/browse/KAFKA-9471
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ted Yu
>Assignee: Ted Yu
>Priority: Minor
>
> In StreamThreadStateStoreProvider we have:
> {code}
> if (streamThread.state() == StreamThread.State.DEAD) {
> return Collections.emptyList();
> {code}
> If user cannot retry anymore, we should throw exception which is handled in 
> the else block.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9471) Return empty collection for PENDING_SHUTDOWN

2020-01-23 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17022589#comment-17022589
 ] 

Ted Yu commented on KAFKA-9471:
---

I will send out a PR soon alone the above line of thinking.

> Return empty collection for PENDING_SHUTDOWN
> 
>
> Key: KAFKA-9471
> URL: https://issues.apache.org/jira/browse/KAFKA-9471
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ted Yu
>Assignee: Ted Yu
>Priority: Minor
>
> In StreamThreadStateStoreProvider we have:
> {code}
> if (streamThread.state() == StreamThread.State.DEAD) {
> return Collections.emptyList();
> {code}
> PENDING_SHUTDOWN is precursor to DEAD state.
> PENDING_SHUTDOWN should be treated the same way as DEAD.
> This makes more sense than current behavior of throwing exception for 
> PENDING_SHUTDOWN.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9471) Return empty collection for PENDING_SHUTDOWN

2020-01-23 Thread Ted Yu (Jira)
Ted Yu created KAFKA-9471:
-

 Summary: Return empty collection for PENDING_SHUTDOWN
 Key: KAFKA-9471
 URL: https://issues.apache.org/jira/browse/KAFKA-9471
 Project: Kafka
  Issue Type: Improvement
Reporter: Ted Yu
Assignee: Ted Yu


In StreamThreadStateStoreProvider we have:
{code}
if (streamThread.state() == StreamThread.State.DEAD) {
return Collections.emptyList();
{code}
PENDING_SHUTDOWN should be treated the same way as DEAD.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9471) Return empty collection for PENDING_SHUTDOWN

2020-01-23 Thread Ted Yu (Jira)
Ted Yu created KAFKA-9471:
-

 Summary: Return empty collection for PENDING_SHUTDOWN
 Key: KAFKA-9471
 URL: https://issues.apache.org/jira/browse/KAFKA-9471
 Project: Kafka
  Issue Type: Improvement
Reporter: Ted Yu
Assignee: Ted Yu


In StreamThreadStateStoreProvider we have:
{code}
if (streamThread.state() == StreamThread.State.DEAD) {
return Collections.emptyList();
{code}
PENDING_SHUTDOWN should be treated the same way as DEAD.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9471) Return empty collection for PENDING_SHUTDOWN

2020-01-23 Thread Ted Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-9471:
--
Description: 
In StreamThreadStateStoreProvider we have:
{code}
if (streamThread.state() == StreamThread.State.DEAD) {
return Collections.emptyList();
{code}
PENDING_SHUTDOWN is precursor to DEAD state.
PENDING_SHUTDOWN should be treated the same way as DEAD.

This makes more sense than current behavior of throwing exception for 
PENDING_SHUTDOWN.

  was:
In StreamThreadStateStoreProvider we have:
{code}
if (streamThread.state() == StreamThread.State.DEAD) {
return Collections.emptyList();
{code}
PENDING_SHUTDOWN should be treated the same way as DEAD.


> Return empty collection for PENDING_SHUTDOWN
> 
>
> Key: KAFKA-9471
> URL: https://issues.apache.org/jira/browse/KAFKA-9471
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: Ted Yu
>Priority: Minor
>
> In StreamThreadStateStoreProvider we have:
> {code}
> if (streamThread.state() == StreamThread.State.DEAD) {
> return Collections.emptyList();
> {code}
> PENDING_SHUTDOWN is precursor to DEAD state.
> PENDING_SHUTDOWN should be treated the same way as DEAD.
> This makes more sense than current behavior of throwing exception for 
> PENDING_SHUTDOWN.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9471) Return empty collection for PENDING_SHUTDOWN

2020-01-23 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17022373#comment-17022373
 ] 

Ted Yu commented on KAFKA-9471:
---

[~mjsax][~guozhang]
Please comment.

> Return empty collection for PENDING_SHUTDOWN
> 
>
> Key: KAFKA-9471
> URL: https://issues.apache.org/jira/browse/KAFKA-9471
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: Ted Yu
>Priority: Minor
>
> In StreamThreadStateStoreProvider we have:
> {code}
> if (streamThread.state() == StreamThread.State.DEAD) {
> return Collections.emptyList();
> {code}
> PENDING_SHUTDOWN is precursor to DEAD state.
> PENDING_SHUTDOWN should be treated the same way as DEAD.
> This makes more sense than current behavior of throwing exception for 
> PENDING_SHUTDOWN.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9464) Close the producer in completeShutdown

2020-01-23 Thread Ted Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu resolved KAFKA-9464.
---
Resolution: Not A Problem

> Close the producer in completeShutdown
> --
>
> Key: KAFKA-9464
> URL: https://issues.apache.org/jira/browse/KAFKA-9464
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: Ted Yu
>Priority: Minor
>
> In StreamThread#completeShutdown, the producer (if not null) should be closed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9464) Close the producer in completeShutdown

2020-01-23 Thread Ted Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu resolved KAFKA-9464.
---
Resolution: Not A Problem

> Close the producer in completeShutdown
> --
>
> Key: KAFKA-9464
> URL: https://issues.apache.org/jira/browse/KAFKA-9464
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: Ted Yu
>Priority: Minor
>
> In StreamThread#completeShutdown, the producer (if not null) should be closed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9465) Enclose consumer call with catching InvalidOffsetException

2020-01-22 Thread Ted Yu (Jira)
Ted Yu created KAFKA-9465:
-

 Summary: Enclose consumer call with catching InvalidOffsetException
 Key: KAFKA-9465
 URL: https://issues.apache.org/jira/browse/KAFKA-9465
 Project: Kafka
  Issue Type: Improvement
Reporter: Ted Yu


In maybeUpdateStandbyTasks, the try block encloses restoreConsumer.poll and 
record handling.
Since InvalidOffsetException is thrown by restoreConsumer.poll, we should 
enclose this call in the try block.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9465) Enclose consumer call with catching InvalidOffsetException

2020-01-22 Thread Ted Yu (Jira)
Ted Yu created KAFKA-9465:
-

 Summary: Enclose consumer call with catching InvalidOffsetException
 Key: KAFKA-9465
 URL: https://issues.apache.org/jira/browse/KAFKA-9465
 Project: Kafka
  Issue Type: Improvement
Reporter: Ted Yu


In maybeUpdateStandbyTasks, the try block encloses restoreConsumer.poll and 
record handling.
Since InvalidOffsetException is thrown by restoreConsumer.poll, we should 
enclose this call in the try block.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9464) Close the producer in completeShutdown

2020-01-22 Thread Ted Yu (Jira)
Ted Yu created KAFKA-9464:
-

 Summary: Close the producer in completeShutdown
 Key: KAFKA-9464
 URL: https://issues.apache.org/jira/browse/KAFKA-9464
 Project: Kafka
  Issue Type: Bug
Reporter: Ted Yu


In StreamThread#completeShutdown, the producer (if not null) should be closed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9464) Close the producer in completeShutdown

2020-01-22 Thread Ted Yu (Jira)
Ted Yu created KAFKA-9464:
-

 Summary: Close the producer in completeShutdown
 Key: KAFKA-9464
 URL: https://issues.apache.org/jira/browse/KAFKA-9464
 Project: Kafka
  Issue Type: Bug
Reporter: Ted Yu


In StreamThread#completeShutdown, the producer (if not null) should be closed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9463) Transient failure in KafkaAdminClientTest.testListOffsets

2020-01-21 Thread Ted Yu (Jira)
Ted Yu created KAFKA-9463:
-

 Summary: Transient failure in KafkaAdminClientTest.testListOffsets
 Key: KAFKA-9463
 URL: https://issues.apache.org/jira/browse/KAFKA-9463
 Project: Kafka
  Issue Type: Test
Reporter: Ted Yu


When running tests with Java 11, I got the following test failure:
{code}
java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node 
assignment.
at 
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at 
org.apache.kafka.clients.admin.KafkaAdminClientTest.testListOffsets(KafkaAdminClientTest.java:2336)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting 
for a node assignment.
{code}
KafkaAdminClientTest.testListOffsets passes when it is run alone.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9463) Transient failure in KafkaAdminClientTest.testListOffsets

2020-01-21 Thread Ted Yu (Jira)
Ted Yu created KAFKA-9463:
-

 Summary: Transient failure in KafkaAdminClientTest.testListOffsets
 Key: KAFKA-9463
 URL: https://issues.apache.org/jira/browse/KAFKA-9463
 Project: Kafka
  Issue Type: Test
Reporter: Ted Yu


When running tests with Java 11, I got the following test failure:
{code}
java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node 
assignment.
at 
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at 
org.apache.kafka.clients.admin.KafkaAdminClientTest.testListOffsets(KafkaAdminClientTest.java:2336)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting 
for a node assignment.
{code}
KafkaAdminClientTest.testListOffsets passes when it is run alone.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9462) Correct exception message in DistributedHerder

2020-01-21 Thread Ted Yu (Jira)
Ted Yu created KAFKA-9462:
-

 Summary: Correct exception message in DistributedHerder
 Key: KAFKA-9462
 URL: https://issues.apache.org/jira/browse/KAFKA-9462
 Project: Kafka
  Issue Type: Bug
Reporter: Ted Yu


There are a few exception messages in DistributedHerder which were copied from 
other exception message.

This task corrects the messages to reflect actual condition



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9462) Correct exception message in DistributedHerder

2020-01-21 Thread Ted Yu (Jira)
Ted Yu created KAFKA-9462:
-

 Summary: Correct exception message in DistributedHerder
 Key: KAFKA-9462
 URL: https://issues.apache.org/jira/browse/KAFKA-9462
 Project: Kafka
  Issue Type: Bug
Reporter: Ted Yu


There are a few exception messages in DistributedHerder which were copied from 
other exception message.

This task corrects the messages to reflect actual condition



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9461) Limit DEBUG statement size when logging failed record value

2020-01-21 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020262#comment-17020262
 ] 

Ted Yu commented on KAFKA-9461:
---

Since we may not see the complete record given any threshold for size limit, we 
can use hardcoded value when the limit is added.

> Limit DEBUG statement size when logging failed record value
> ---
>
> Key: KAFKA-9461
> URL: https://issues.apache.org/jira/browse/KAFKA-9461
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 2.4.0
>Reporter: Nicolas Guyomar
>Priority: Minor
>
> Hi,
> It is possible with the current implementation that we log a full record 
> content at DEBUG level, which can overwhelmed log4j buffer and OOM it : 
> That stack trace was due to a 70MB messages refused by a broker 
> {code:java}
> java.lang.OutOfMemoryError: Java heap space
> at java.util.Arrays.copyOf(Arrays.java:3332)
> at 
> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
> at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
> at java.lang.StringBuffer.append(StringBuffer.java:270)
> at 
> org.apache.log4j.helpers.PatternParser$LiteralPatternConverter.format(PatternParser.java:419)
> at org.apache.log4j.PatternLayout.format(PatternLayout.java:506)
> at org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:310)
> at 
> org.apache.log4j.RollingFileAppender.subAppend(RollingFileAppender.java:276)
> at org.apache.log4j.WriterAppender.append(WriterAppender.java:162)
> at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
> at 
> org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
> at org.apache.log4j.Category.callAppenders(Category.java:206)
> at org.apache.log4j.Category.forcedLog(Category.java:391)
> at org.apache.log4j.Category.log(Category.java:856)
> at org.slf4j.impl.Log4jLoggerAdapter.debug(Log4jLoggerAdapter.java:252)
> at 
> org.apache.kafka.connect.runtime.WorkerSourceTask$1.onCompletion(WorkerSourceTask.java:330){code}
>   in  
> [https://github.com/apache/kafka/blob/da4337271ef0b72643c0cf47ae51e69f79cf1901/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L348]
> Would it make sense to protect Connect directly in the ConnectRecord 
> toString() method and set a configurable limit ? 
>  Thank you
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9455) Consider using TreeMap for In-memory stores of Streams

2020-01-19 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17019191#comment-17019191
 ] 

Ted Yu commented on KAFKA-9455:
---

Maybe we can also look at (profile) Maps from fastutil such as:

http://fastutil.di.unimi.it/docs/it/unimi/dsi/fastutil/objects/Object2ObjectSortedMap.html

> Consider using TreeMap for In-memory stores of Streams
> --
>
> Key: KAFKA-9455
> URL: https://issues.apache.org/jira/browse/KAFKA-9455
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>
> From [~ableegoldman]: It's worth noting that it might be a good idea to 
> switch to TreeMap for different reasons. Right now the ConcurrentSkipListMap 
> allows us to safely perform range queries without copying over the entire 
> keyset, but the performance on point queries seems to scale noticeably worse 
> with the number of unique keys. Point queries are used by aggregations while 
> range queries are used by windowed joins, but of course both are available 
> within the PAPI and for interactive queries so it's hard to say which we 
> should prefer. Maybe rather than make that tradeoff we should have one 
> version for efficient range queries (a "JoinWindowStore") and one for 
> efficient point queries ("AggWindowStore") - or something. I know we've had 
> similar thoughts for a different RocksDB store layout for Joins (although I 
> can't find that ticket anywhere..), it seems like the in-memory stores could 
> benefit from a special "Join" version as well cc/ Guozhang Wang
> Here are some random thoughts:
> 1. For kafka streams processing logic (i.e. without IQ), it's better to make 
> all processing logic relying on point queries rather than range queries. 
> Right now the only processor that use range queries are, as mentioned above, 
> windowed stream-stream joins. I think we should consider using a different 
> window implementation for this (and as a result also get rid of the 
> retainDuplicate flags) to refactor the windowed stream-stream join operation.
> 2. With 1), range queries would only be exposed as IQ. Depending on its usage 
> frequency I think it makes lots of sense to optimize for single-point queries.
> Of course, even without step 1) we should still consider using tree-map for 
> windowed in-memory stores to have a better scaling effect.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9450) Decouple inner state flushing from committing with EOS

2020-01-18 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018602#comment-17018602
 ] 

Ted Yu commented on KAFKA-9450:
---

w.r.t. separate column family, since the data in this family tends to be small 
compared to the data family, wouldn't we end up with small files similar to 
rocksdb memtable flush ?

> Decouple inner state flushing from committing with EOS
> --
>
> Key: KAFKA-9450
> URL: https://issues.apache.org/jira/browse/KAFKA-9450
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> When EOS is turned on, the commit interval is set quite low (100ms) and all 
> the store layers are flushed during a commit. This is necessary for 
> forwarding records in the cache to the changelog, but unfortunately also 
> forces rocksdb to flush the current memtable before it's full. The result is 
> a large number of small writes to disk, losing the benefits of batching, and 
> a large number of very small L0 files that are likely to slow compaction.
> Since we have to delete the stores to recreate from scratch anyways during an 
> unclean shutdown with EOS, we may as well skip flushing the innermost 
> StateStore during a commit and only do so during a graceful shutdown, before 
> a rebalance, etc. This is currently blocked on a refactoring of the state 
> store layers to allow decoupling the flush of the caching layer from the 
> actual state store.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-17 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018368#comment-17018368
 ] 

Ted Yu edited comment on KAFKA-8532 at 1/18/20 2:26 AM:


Looking at js2.log , I am not sure deadlock was observed. Maybe 
handleRequests() took very long to execute (yet to be verified by whether there 
was exception in this code path from server log).
I wonder if we can utilize the following form of await:
{code}
public boolean await(long timeout, TimeUnit unit)
{code}
so that the execution time of handleRequests() can be bounded.
{code}
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala 
b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 6a0809e16..32e4380c0 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -165,7 +165,7 @@ class ZooKeeperClient(connectString: String,
   countDownLatch.countDown()
 }
   }
-  countDownLatch.await()
+  countDownLatch.await(sessionTimeoutMs, TimeUnit.MILLISECONDS)
   responseQueue.asScala.toBuffer
 }
   }
{code}


was (Author: yuzhih...@gmail.com):
Looking at js2.log , I am not sure deadlock was observed. Maybe 
handleRequests() took very long to execute (yet to be verified by whether there 
was exception in this code path from server log).
I wonder if we can utilize the following form of await:
{code}
public boolean await(long timeout, TimeUnit unit)
{code}
so that the execution time of handleRequests() can be bounded.
{code}
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala 
b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 6a0809e16..32e4380c0 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -165,7 +165,7 @@ class ZooKeeperClient(connectString: String,
   countDownLatch.countDown()
 }
   }
-  countDownLatch.await()
+  countDownLatch.await(connectionTimeoutMs, TimeUnit.MILLISECONDS)
   responseQueue.asScala.toBuffer
 }
   }
{code}

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log, js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
>  at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428)
>  at 
> 

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-17 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018368#comment-17018368
 ] 

Ted Yu edited comment on KAFKA-8532 at 1/18/20 2:22 AM:


Looking at js2.log , I am not sure deadlock was observed. Maybe 
handleRequests() took very long to execute (yet to be verified by whether there 
was exception in this code path from server log).
I wonder if we can utilize the following form of await:
{code}
public boolean await(long timeout, TimeUnit unit)
{code}
so that the execution time of handleRequests() can be bounded.
{code}
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala 
b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 6a0809e16..32e4380c0 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -165,7 +165,7 @@ class ZooKeeperClient(connectString: String,
   countDownLatch.countDown()
 }
   }
-  countDownLatch.await()
+  countDownLatch.await(connectionTimeoutMs, TimeUnit.MILLISECONDS)
   responseQueue.asScala.toBuffer
 }
   }
{code}


was (Author: yuzhih...@gmail.com):
Looking at js2.log , I am not sure deadlock was observed. Maybe 
handleRequests() took very long to execute (yet to be verified by whether there 
was exception in this code path from server log).
I wonder if we can utilize the following form of await:
```
public boolean await(long timeout, TimeUnit unit)
```
so that the execution time of handleRequests() can be bounded.
```
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala 
b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 6a0809e16..32e4380c0 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -165,7 +165,7 @@ class ZooKeeperClient(connectString: String,
   countDownLatch.countDown()
 }
   }
-  countDownLatch.await()
+  countDownLatch.await(connectionTimeoutMs, TimeUnit.MILLISECONDS)
   responseQueue.asScala.toBuffer
 }
   }
```

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log, js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
>  at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428)
>  at 
> 

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-17 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018368#comment-17018368
 ] 

Ted Yu edited comment on KAFKA-8532 at 1/18/20 2:21 AM:


Looking at js2.log , I am not sure deadlock was observed. Maybe 
handleRequests() took very long to execute (yet to be verified by whether there 
was exception in this code path from server log).
I wonder if we can utilize the following form of await:
```
public boolean await(long timeout, TimeUnit unit)
```
so that the execution time of handleRequests() can be bounded.
```
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala 
b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 6a0809e16..32e4380c0 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -165,7 +165,7 @@ class ZooKeeperClient(connectString: String,
   countDownLatch.countDown()
 }
   }
-  countDownLatch.await()
+  countDownLatch.await(connectionTimeoutMs, TimeUnit.MILLISECONDS)
   responseQueue.asScala.toBuffer
 }
   }
```


was (Author: yuzhih...@gmail.com):
Looking at js2.log , I am not sure deadlock was observed. Maybe 
handleRequests() took very long to execute (yet to be verified by whether there 
was exception in this code path from server log).
I wonder if we can utilize the following form of await:
```
public boolean await(long timeout, TimeUnit unit)
```
so that the execution time of handleRequests() can be bounded.

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log, js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
>  at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428)
>  at 
> kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
>  at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> 

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-17 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018368#comment-17018368
 ] 

Ted Yu edited comment on KAFKA-8532 at 1/17/20 10:42 PM:
-

Looking at js2.log , I am not sure deadlock was observed. Maybe 
handleRequests() took very long to execute (yet to be verified by whether there 
was exception in this code path from server log).
I wonder if we can utilize the following form of await:
```
public boolean await(long timeout, TimeUnit unit)
```
so that the execution time of handleRequests() can be bounded.


was (Author: yuzhih...@gmail.com):
Looking at js2.log , I am not sure deadlock was observed. Maybe 
handleRequests() took very long to execute.
I wonder if we can utilize the following form of await:
```
public boolean await(long timeout, TimeUnit unit)
```
so that the execution time of handleRequests() can be bounded.

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log, js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
>  at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428)
>  at 
> kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
>  at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Locked ownable synchronizers:
>  - <0x000661e8d2e0> (a java.util.concurrent.ThreadPoolExecutor$Worker)
> "controller-event-thread" #51 prio=5 os_prio=0 tid=0x7fceaeec4000 
> nid=0x310 waiting on condition [0x7fccb55c8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for 

[jira] [Commented] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-17 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018368#comment-17018368
 ] 

Ted Yu commented on KAFKA-8532:
---

Looking at js2.log , I am not sure deadlock was observed. Maybe 
handleRequests() took very long to execute.
I wonder if we can utilize the following form of await:
```
public boolean await(long timeout, TimeUnit unit)
```
so that the execution time of handleRequests() can be bounded.

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log, js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
>  at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428)
>  at 
> kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
>  at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Locked ownable synchronizers:
>  - <0x000661e8d2e0> (a java.util.concurrent.ThreadPoolExecutor$Worker)
> "controller-event-thread" #51 prio=5 os_prio=0 tid=0x7fceaeec4000 
> nid=0x310 waiting on condition [0x7fccb55c8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005d1be5a00> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> 

[jira] [Commented] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-17 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018352#comment-17018352
 ] 

Ted Yu commented on KAFKA-8532:
---

Looking at KafkaController.scala in trunk, I don't see 
Expire.waitUntilProcessingStarted shown in the stack trace.
It seems the class has gone through refactoring / bug fix.

[~lbdai3190]
If you can attach server log, that may help us find the root cause.

Thanks

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log, js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
>  at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428)
>  at 
> kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
>  at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Locked ownable synchronizers:
>  - <0x000661e8d2e0> (a java.util.concurrent.ThreadPoolExecutor$Worker)
> "controller-event-thread" #51 prio=5 os_prio=0 tid=0x7fceaeec4000 
> nid=0x310 waiting on condition [0x7fccb55c8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005d1be5a00> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> 

[jira] [Commented] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-17 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018175#comment-17018175
 ] 

Ted Yu commented on KAFKA-8532:
---

Created https://github.com/apache/kafka/pull/7978

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log, js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
>  at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428)
>  at 
> kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
>  at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Locked ownable synchronizers:
>  - <0x000661e8d2e0> (a java.util.concurrent.ThreadPoolExecutor$Worker)
> "controller-event-thread" #51 prio=5 os_prio=0 tid=0x7fceaeec4000 
> nid=0x310 waiting on condition [0x7fccb55c8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005d1be5a00> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>  at kafka.zookeeper.ZooKeeperClient.handleRequests(ZooKeeperClient.scala:157)
>  at 
> 

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-17 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018152#comment-17018152
 ] 

Ted Yu edited comment on KAFKA-8532 at 1/17/20 4:13 PM:


How about making the following change ?
{code}
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala 
b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 7b995931f..6a0809e16 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -158,14 +158,11 @@ class ZooKeeperClient(connectString: String,
   inReadLock(initializationLock) {
 send(request) { response =>
   responseQueue.add(response)
-  inFlightRequests.release()
-  countDownLatch.countDown()
 }
   }
-} catch {
-  case e: Throwable =>
-inFlightRequests.release()
-throw e
+   } finally {
+  inFlightRequests.release()
+  countDownLatch.countDown()
 }
   }
   countDownLatch.await()
{code}
countDownLatch is handled consistently with inFlightRequests.

I have run through core:test which passed.

I can send out a PR.


was (Author: yuzhih...@gmail.com):
How about making the following change ?
{code}
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala 
b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 7b995931f..6a0809e16 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -158,14 +158,11 @@ class ZooKeeperClient(connectString: String,
   inReadLock(initializationLock) {
 send(request) { response =>
   responseQueue.add(response)
-  inFlightRequests.release()
-  countDownLatch.countDown()
 }
   }
-} catch {
-  case e: Throwable =>
-inFlightRequests.release()
-throw e
+   } finally {
+  inFlightRequests.release()
+  countDownLatch.countDown()
 }
   }
   countDownLatch.await()
{code}
countDownLatch is handled consistently with inFlightRequests.

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log, js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
>  at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374)
>  at 
> 

[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception

2019-05-23 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16846843#comment-16846843
 ] 

Ted Yu commented on KAFKA-5998:
---

Can you move state directory outside of /tmp which is subject to cleaning by 
the OS ?

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, 
> props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> 

[jira] [Resolved] (FLINK-10446) Use the "guava beta checker" plugin to keep off of @Beta API

2019-04-29 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu resolved FLINK-10446.

Resolution: Won't Fix

> Use the "guava beta checker" plugin to keep off of @Beta API
> 
>
> Key: FLINK-10446
> URL: https://issues.apache.org/jira/browse/FLINK-10446
> Project: Flink
>  Issue Type: Task
>  Components: Build System
>Reporter: Ted Yu
>Assignee: Ji Liu
>Priority: Major
>
> The Guava people publish an Error Prone plugin to detect when stuff that's 
> annotated with @Beta gets used. Those things shouldn't be used because the 
> project gives no promises about deprecating before removal.
> plugin:
> https://github.com/google/guava-beta-checker



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-5998) /.checkpoint.tmp Not found exception

2019-04-26 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16827123#comment-16827123
 ] 

Ted Yu edited comment on KAFKA-5998 at 4/26/19 5:31 PM:


https://pastebin.com/vyvQ8pkF shows what I mentioned earlier.

In StateDirectory#cleanRemovedTasks, we use both last modification time of the 
directory and whether the task has been committed as criteria. When both are 
satisfied, we clean the directory for the task.


was (Author: yuzhih...@gmail.com):
https://pastebin.com/Y247UZgb shows what I mentioned earlier (incomplete).

In StateDirectory#cleanRemovedTasks, we use both last modification time of the 
directory and whether the task has been committed as criteria. When both are 
satisfied, we clean the directory for the task.

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, 
> props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at 

[jira] [Comment Edited] (KAFKA-5998) /.checkpoint.tmp Not found exception

2019-04-26 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16827123#comment-16827123
 ] 

Ted Yu edited comment on KAFKA-5998 at 4/26/19 5:12 PM:


https://pastebin.com/Y247UZgb shows what I mentioned earlier (incomplete).

In StateDirectory#cleanRemovedTasks, we use both last modification time of the 
directory and whether the task has been committed as criteria. When both are 
satisfied, we clean the directory for the task.


was (Author: yuzhih...@gmail.com):
https://pastebin.com/Y247UZgb shows what I mentioned earlier.
In StateDirectory#cleanRemovedTasks, we don't rely on last modification time - 
we check whether the task has been committed.

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, 
> props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at 

[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception

2019-04-26 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16827123#comment-16827123
 ] 

Ted Yu commented on KAFKA-5998:
---

https://pastebin.com/Y247UZgb shows what I mentioned earlier.
In StateDirectory#cleanRemovedTasks, we don't rely on last modification time - 
we check whether the task has been committed.

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, 
> props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  

[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception

2019-04-26 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16827089#comment-16827089
 ] 

Ted Yu commented on KAFKA-5998:
---

[~mparthas]:
>From your comment on Apr 8th, it seems that making the value for 
>"state.cleanup.delay.ms" longer didn't avoid .checkpoint.tmp disappearing.

Did you see log similar to the following prior to the error ?
{code}
April 25th 2019, 21:03:49.332 2019-04-25 21:03:49,332 INFO 
[org.apache.kafka.streams.processor.internals.StateDirectory] 
(application-fde85da6-9d2f-4457-8bdb-ea1c78c8c1e2-CleanupThread) - 
[short-component-name:; transaction-id:; user-id:; creation-time:] 
stream-thread [application-fde85da6-9d2f-4457-8bdb-ea1c78c8c1e2-CleanupThread] 
Deleting obsolete state directory 1_1 for task 1_1 as 813332ms has elapsed 
(cleanup delay is 60ms).
{code}
If so, that might indicate the new value for "state.cleanup.delay.ms" was still 
short.
If not, there could be other reason for the problem.

In case the log has been swapped out, please keep an eye for future occurrences.

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, 
> props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> 

[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception

2019-04-20 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16822564#comment-16822564
 ] 

Ted Yu commented on KAFKA-5998:
---

One doubt I had after reading recent comments is that, if the state directory 
is used by more than one process, wouldn't the problem manifest itself other 
than disappearing checkpoint file (such as corrupted checkpoint) ?

My former comment on reference counting was not refined (pending answer to the 
above doubt, we may come up with good solution).

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, 
> props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>  

[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception

2019-04-20 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16822446#comment-16822446
 ] 

Ted Yu commented on KAFKA-5998:
---

I wonder if we can use a file, which records reference count (for processor 
instances), alongside checkpoint file.

Checkpoint file is only deleted when count reaches zero.

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, 
> props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  

[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception

2019-04-06 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16811681#comment-16811681
 ] 

Ted Yu commented on KAFKA-5998:
---

I made the following change:
{code}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
index badaa36cd..be29ebe56 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
@@ -138,6 +138,7 @@ public abstract class AbstractJoinIntegrationTest {
 
RESULT_CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
LongDeserializer.class);
 
RESULT_CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);

+STREAMS_CONFIG.put(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, "1");
 STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
 STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, 
true);
 STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
{code}
For the following check in KafkaStreams#start:
{code}
if (state == State.RUNNING) {
stateDirectory.cleanRemovedTasks(cleanupDelay);
{code}
The state was REBALANCING during TableTableJoinIntegrationTest.
The cleaning was not triggered.

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Priority: Major
> Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, 
> props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 

[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception

2019-04-06 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16811603#comment-16811603
 ] 

Ted Yu commented on KAFKA-5998:
---

It seems StreamTask can keep track of active tasks. After commit() finishes, 
StreamTask can inform stateDirectory.cleanRemovedTasks that the underlying task 
is subject to cleaning.
This would be more robust than depending on period of inactivity alone for 
cleaning.

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Priority: Major
> Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, 
> props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> 

[jira] [Comment Edited] (KAFKA-5998) /.checkpoint.tmp Not found exception

2019-04-06 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16811551#comment-16811551
 ] 

Ted Yu edited comment on KAFKA-5998 at 4/6/19 10:55 AM:


Cleanup delay is controlled by:
{code}
public static final String STATE_CLEANUP_DELAY_MS_CONFIG = 
"state.cleanup.delay.ms";
{code}
Maybe longer value can be specified so that CleanupThread doesn't clean up the 
task directory prematurely (in case task directory hasn't been modified for a 
while).


was (Author: yuzhih...@gmail.com):
Cleanup delay is controlled by:
{code}
public static final String STATE_CLEANUP_DELAY_MS_CONFIG = 
"state.cleanup.delay.ms";
{code}
Maybe longer value can be specified so that CleanupThread doesn't clean up the 
task directory prematurely.

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Priority: Major
> Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, 
> props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 

[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception

2019-04-06 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16811551#comment-16811551
 ] 

Ted Yu commented on KAFKA-5998:
---

Cleanup delay is controlled by:
{code}
public static final String STATE_CLEANUP_DELAY_MS_CONFIG = 
"state.cleanup.delay.ms";
{code}
Maybe longer value can be specified so that CleanupThread doesn't clean up the 
task directory prematurely.

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Priority: Major
> Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, 
> props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> 

[jira] [Comment Edited] (KAFKA-3729) Auto-configure non-default SerDes passed alongside the topology builder

2019-03-17 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16794549#comment-16794549
 ] 

Ted Yu edited comment on KAFKA-3729 at 3/18/19 3:06 AM:


3729.v6.txt shows the progress toward passing StreamsConfig to Store#init().

The assertion in KStreamAggregationDedupIntegrationTest#shouldGroupByKey passes.
The assertion fails without proper Store init.

Note, PR #6461 doesn't require KIP.


was (Author: yuzhih...@gmail.com):
3729.v6.txt shows the progress toward passing StreamsConfig to Store#init().

The assertion in KStreamAggregationDedupIntegrationTest#shouldGroupByKey passes.
The assertion fails without proper Store init.

>  Auto-configure non-default SerDes passed alongside the topology builder
> 
>
> Key: KAFKA-3729
> URL: https://issues.apache.org/jira/browse/KAFKA-3729
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Fred Patton
>Assignee: Ted Yu
>Priority: Major
>  Labels: api, newbie
> Attachments: 3729.txt, 3729.v6.txt
>
>
> From Guozhang Wang:
> "Only default serdes provided through configs are auto-configured today. But 
> we could auto-configure other serdes passed alongside the topology builder as 
> well."



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-3729) Auto-configure non-default SerDes passed alongside the topology builder

2019-03-17 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-3729:
--
Attachment: (was: 3729.v6.txt)

>  Auto-configure non-default SerDes passed alongside the topology builder
> 
>
> Key: KAFKA-3729
> URL: https://issues.apache.org/jira/browse/KAFKA-3729
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Fred Patton
>Assignee: Ted Yu
>Priority: Major
>  Labels: api, newbie
> Attachments: 3729.txt, 3729.v6.txt
>
>
> From Guozhang Wang:
> "Only default serdes provided through configs are auto-configured today. But 
> we could auto-configure other serdes passed alongside the topology builder as 
> well."



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-3729) Auto-configure non-default SerDes passed alongside the topology builder

2019-03-17 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16794549#comment-16794549
 ] 

Ted Yu edited comment on KAFKA-3729 at 3/17/19 6:39 PM:


3729.v6.txt shows the progress toward passing StreamsConfig to Store#init().

The assertion in KStreamAggregationDedupIntegrationTest#shouldGroupByKey passes.
The assertion fails without proper Store init.


was (Author: yuzhih...@gmail.com):
3729.v6.txt shows the progress toward passing StreamsConfig to Store#init().

>  Auto-configure non-default SerDes passed alongside the topology builder
> 
>
> Key: KAFKA-3729
> URL: https://issues.apache.org/jira/browse/KAFKA-3729
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Fred Patton
>Assignee: Ted Yu
>Priority: Major
>  Labels: api, newbie
> Attachments: 3729.txt, 3729.v6.txt
>
>
> From Guozhang Wang:
> "Only default serdes provided through configs are auto-configured today. But 
> we could auto-configure other serdes passed alongside the topology builder as 
> well."



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-3729) Auto-configure non-default SerDes passed alongside the topology builder

2019-03-17 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-3729:
--
Attachment: 3729.v6.txt

>  Auto-configure non-default SerDes passed alongside the topology builder
> 
>
> Key: KAFKA-3729
> URL: https://issues.apache.org/jira/browse/KAFKA-3729
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Fred Patton
>Assignee: Ted Yu
>Priority: Major
>  Labels: api, newbie
> Attachments: 3729.txt, 3729.v6.txt
>
>
> From Guozhang Wang:
> "Only default serdes provided through configs are auto-configured today. But 
> we could auto-configure other serdes passed alongside the topology builder as 
> well."



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3729) Auto-configure non-default SerDes passed alongside the topology builder

2019-03-17 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16794549#comment-16794549
 ] 

Ted Yu commented on KAFKA-3729:
---

3729.v6.txt shows the progress toward passing StreamsConfig to Store#init().

>  Auto-configure non-default SerDes passed alongside the topology builder
> 
>
> Key: KAFKA-3729
> URL: https://issues.apache.org/jira/browse/KAFKA-3729
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Fred Patton
>Assignee: Ted Yu
>Priority: Major
>  Labels: api, newbie
> Attachments: 3729.txt, 3729.v6.txt
>
>
> From Guozhang Wang:
> "Only default serdes provided through configs are auto-configured today. But 
> we could auto-configure other serdes passed alongside the topology builder as 
> well."



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-3729) Auto-configure non-default SerDes passed alongside the topology builder

2019-03-17 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-3729:
--
Attachment: 3729.v6.txt

>  Auto-configure non-default SerDes passed alongside the topology builder
> 
>
> Key: KAFKA-3729
> URL: https://issues.apache.org/jira/browse/KAFKA-3729
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Fred Patton
>Assignee: Ted Yu
>Priority: Major
>  Labels: api, newbie
> Attachments: 3729.txt, 3729.v6.txt
>
>
> From Guozhang Wang:
> "Only default serdes provided through configs are auto-configured today. But 
> we could auto-configure other serdes passed alongside the topology builder as 
> well."



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3729) Auto-configure non-default SerDes passed alongside the topology builder

2019-03-07 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16787312#comment-16787312
 ] 

Ted Yu commented on KAFKA-3729:
---

https://github.com/apache/kafka/pull/6399

>  Auto-configure non-default SerDes passed alongside the topology builder
> 
>
> Key: KAFKA-3729
> URL: https://issues.apache.org/jira/browse/KAFKA-3729
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Fred Patton
>Priority: Major
>  Labels: api, newbie
> Attachments: 3729.txt
>
>
> From Guozhang Wang:
> "Only default serdes provided through configs are auto-configured today. But 
> we could auto-configure other serdes passed alongside the topology builder as 
> well."



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3729) Auto-configure non-default SerDes passed alongside the topology builder

2019-03-07 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16787272#comment-16787272
 ] 

Ted Yu commented on KAFKA-3729:
---

The following seems to result in compilation error:
{code}
/Users/yute/kafka/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:667:
 warning: [unchecked] unchecked call to configure(Map,boolean) as a 
member of the raw type Serializer
sn.getKeySer().configure(config.originals(), true);
^
/Users/yute/kafka/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:668:
 warning: [unchecked] unchecked call to configure(Map,boolean) as a 
member of the raw type Serializer
sn.getValueSer().configure(config.originals(), false);
  ^
{code}
I wonder how the warnings can be suppressed.
I checked existing calls to configure() which doesn't give me much clue.

>  Auto-configure non-default SerDes passed alongside the topology builder
> 
>
> Key: KAFKA-3729
> URL: https://issues.apache.org/jira/browse/KAFKA-3729
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Fred Patton
>Priority: Major
>  Labels: api, newbie
> Attachments: 3729.txt
>
>
> From Guozhang Wang:
> "Only default serdes provided through configs are auto-configured today. But 
> we could auto-configure other serdes passed alongside the topology builder as 
> well."



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3729) Auto-configure non-default SerDes passed alongside the topology builder

2019-03-05 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16785119#comment-16785119
 ] 

Ted Yu commented on KAFKA-3729:
---

Attached tentative patch.
If it is on right track, I can send out a PR.

>  Auto-configure non-default SerDes passed alongside the topology builder
> 
>
> Key: KAFKA-3729
> URL: https://issues.apache.org/jira/browse/KAFKA-3729
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Fred Patton
>Priority: Major
>  Labels: api, newbie
> Attachments: 3729.txt
>
>
> From Guozhang Wang:
> "Only default serdes provided through configs are auto-configured today. But 
> we could auto-configure other serdes passed alongside the topology builder as 
> well."



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-3729) Auto-configure non-default SerDes passed alongside the topology builder

2019-03-05 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-3729:
--
Attachment: 3729.txt

>  Auto-configure non-default SerDes passed alongside the topology builder
> 
>
> Key: KAFKA-3729
> URL: https://issues.apache.org/jira/browse/KAFKA-3729
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Fred Patton
>Priority: Major
>  Labels: api, newbie
> Attachments: 3729.txt
>
>
> From Guozhang Wang:
> "Only default serdes provided through configs are auto-configured today. But 
> we could auto-configure other serdes passed alongside the topology builder as 
> well."



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3729) Auto-configure non-default SerDes passed alongside the topology builder

2019-03-05 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16784755#comment-16784755
 ] 

Ted Yu commented on KAFKA-3729:
---

Can AbstractProcessorContext#appConfigs() be used to obtain the Map which 
configure() uses ?

>  Auto-configure non-default SerDes passed alongside the topology builder
> 
>
> Key: KAFKA-3729
> URL: https://issues.apache.org/jira/browse/KAFKA-3729
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Fred Patton
>Priority: Major
>  Labels: api, newbie
>
> From Guozhang Wang:
> "Only default serdes provided through configs are auto-configured today. But 
> we could auto-configure other serdes passed alongside the topology builder as 
> well."



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   4   5   6   7   8   9   10   >