[jira] [Assigned] (FLINK-10478) Kafka Producer wrongly formats "%" for transaction ID

2018-12-06 Thread leesf (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

leesf reassigned FLINK-10478:
-

Assignee: leesf  (was: vinoyang)

> Kafka Producer wrongly formats "%" for transaction ID
> -
>
> Key: FLINK-10478
> URL: https://issues.apache.org/jira/browse/FLINK-10478
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.2
> Environment: Flink 1.4.2
> Scala 2.11.12
> jdk1.8.0_162
> Running on local embedded Flink mini cluster (This happened on a standalone 
> cluster with another code)
>Reporter: Obi Tetsuya
>Assignee: leesf
>Priority: Minor
>
> Kafka Producer with exactly-once feature uses its task name for a transaction 
> ID. Because the Producer uses the name as a format string directly, in the 
> case it contains "%" the job fails.
> Code to reproduce:
> {code:scala}
> object ExampleRunner {
>   def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
> env.enableCheckpointing(1000)
> env.getConfig.disableSysoutLogging()
> env.setRestartStrategy(RestartStrategies.noRestart)
> val p = new java.util.Properties
> Map("bootstrap.servers" -> "192.168.1.100:9092", "transaction.timeout.ms" 
> -> (10 * 60 * 1000).toString).foreach { case (k,v) => p.setProperty(k,v) }
> env
>   .fromCollection(100 to 200)
>   .map(_.toString)
>   .addSink(new FlinkKafkaProducer011(
> "test",
> new KeyedSerializationSchemaWrapper(new SimpleStringSchema),
> p,
> Semantic.EXACTLY_ONCE)).name("100%")
> env.execute()
>   }
> }
> {code}
> Raised exception:
> {code}
> 2018-10-02 17:00:12.918 [Map -> Sink: 100% (1/8)] INFO  
> o.a.flink.runtime.taskmanager.Task  - Map -> Sink: 100% (1/8) 
> (25190aeccdce738afdd00e9320903d7b) switched from RUNNING to FAILED.
> java.util.MissingFormatWidthException: %-%
>   at java.util.Formatter$FormatSpecifier.checkText(Formatter.java:3040)
>   at java.util.Formatter$FormatSpecifier.(Formatter.java:2733)
>   at java.util.Formatter.parse(Formatter.java:2560)
>   at java.util.Formatter.format(Formatter.java:2501)
>   at java.util.Formatter.format(Formatter.java:2455)
>   at java.lang.String.format(String.java:2940)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.TransactionalIdsGenerator.generateTransactionalId(TransactionalIdsGenerator.java:91)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.TransactionalIdsGenerator.generateIdsToUse(TransactionalIdsGenerator.java:72)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.TransactionalIdsGenerator.generateIdsToAbort(TransactionalIdsGenerator.java:85)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:850)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10478) Kafka Producer wrongly formats "%" for transaction ID

2018-10-03 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-10478:


Assignee: vinoyang

> Kafka Producer wrongly formats "%" for transaction ID
> -
>
> Key: FLINK-10478
> URL: https://issues.apache.org/jira/browse/FLINK-10478
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.2
> Environment: Flink 1.4.2
> Scala 2.11.12
> jdk1.8.0_162
> Running on local embedded Flink mini cluster (This happened on a standalone 
> cluster with another code)
>Reporter: Obi Tetsuya
>Assignee: vinoyang
>Priority: Minor
>
> Kafka Producer with exactly-once feature uses its task name for a transaction 
> ID. Because the Producer uses the name as a format string directly, in the 
> case it contains "%" the job fails.
> Code to reproduce:
> {code:scala}
> object ExampleRunner {
>   def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
> env.enableCheckpointing(1000)
> env.getConfig.disableSysoutLogging()
> env.setRestartStrategy(RestartStrategies.noRestart)
> val p = new java.util.Properties
> Map("bootstrap.servers" -> "192.168.1.100:9092", "transaction.timeout.ms" 
> -> (10 * 60 * 1000).toString).foreach { case (k,v) => p.setProperty(k,v) }
> env
>   .fromCollection(100 to 200)
>   .map(_.toString)
>   .addSink(new FlinkKafkaProducer011(
> "test",
> new KeyedSerializationSchemaWrapper(new SimpleStringSchema),
> p,
> Semantic.EXACTLY_ONCE)).name("100%")
> env.execute()
>   }
> }
> {code}
> Raised exception:
> {code}
> 2018-10-02 17:00:12.918 [Map -> Sink: 100% (1/8)] INFO  
> o.a.flink.runtime.taskmanager.Task  - Map -> Sink: 100% (1/8) 
> (25190aeccdce738afdd00e9320903d7b) switched from RUNNING to FAILED.
> java.util.MissingFormatWidthException: %-%
>   at java.util.Formatter$FormatSpecifier.checkText(Formatter.java:3040)
>   at java.util.Formatter$FormatSpecifier.(Formatter.java:2733)
>   at java.util.Formatter.parse(Formatter.java:2560)
>   at java.util.Formatter.format(Formatter.java:2501)
>   at java.util.Formatter.format(Formatter.java:2455)
>   at java.lang.String.format(String.java:2940)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.TransactionalIdsGenerator.generateTransactionalId(TransactionalIdsGenerator.java:91)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.TransactionalIdsGenerator.generateIdsToUse(TransactionalIdsGenerator.java:72)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.TransactionalIdsGenerator.generateIdsToAbort(TransactionalIdsGenerator.java:85)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:850)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)