[jira] [Assigned] (FLINK-10478) Kafka Producer wrongly formats "%" for transaction ID
[ https://issues.apache.org/jira/browse/FLINK-10478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] leesf reassigned FLINK-10478: - Assignee: leesf (was: vinoyang) > Kafka Producer wrongly formats "%" for transaction ID > - > > Key: FLINK-10478 > URL: https://issues.apache.org/jira/browse/FLINK-10478 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.2 > Environment: Flink 1.4.2 > Scala 2.11.12 > jdk1.8.0_162 > Running on local embedded Flink mini cluster (This happened on a standalone > cluster with another code) >Reporter: Obi Tetsuya >Assignee: leesf >Priority: Minor > > Kafka Producer with exactly-once feature uses its task name for a transaction > ID. Because the Producer uses the name as a format string directly, in the > case it contains "%" the job fails. > Code to reproduce: > {code:scala} > object ExampleRunner { > def main(args: Array[String]): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) > env.enableCheckpointing(1000) > env.getConfig.disableSysoutLogging() > env.setRestartStrategy(RestartStrategies.noRestart) > val p = new java.util.Properties > Map("bootstrap.servers" -> "192.168.1.100:9092", "transaction.timeout.ms" > -> (10 * 60 * 1000).toString).foreach { case (k,v) => p.setProperty(k,v) } > env > .fromCollection(100 to 200) > .map(_.toString) > .addSink(new FlinkKafkaProducer011( > "test", > new KeyedSerializationSchemaWrapper(new SimpleStringSchema), > p, > Semantic.EXACTLY_ONCE)).name("100%") > env.execute() > } > } > {code} > Raised exception: > {code} > 2018-10-02 17:00:12.918 [Map -> Sink: 100% (1/8)] INFO > o.a.flink.runtime.taskmanager.Task - Map -> Sink: 100% (1/8) > (25190aeccdce738afdd00e9320903d7b) switched from RUNNING to FAILED. > java.util.MissingFormatWidthException: %-% > at java.util.Formatter$FormatSpecifier.checkText(Formatter.java:3040) > at java.util.Formatter$FormatSpecifier.(Formatter.java:2733) > at java.util.Formatter.parse(Formatter.java:2560) > at java.util.Formatter.format(Formatter.java:2501) > at java.util.Formatter.format(Formatter.java:2455) > at java.lang.String.format(String.java:2940) > at > org.apache.flink.streaming.connectors.kafka.internal.TransactionalIdsGenerator.generateTransactionalId(TransactionalIdsGenerator.java:91) > at > org.apache.flink.streaming.connectors.kafka.internal.TransactionalIdsGenerator.generateIdsToUse(TransactionalIdsGenerator.java:72) > at > org.apache.flink.streaming.connectors.kafka.internal.TransactionalIdsGenerator.generateIdsToAbort(TransactionalIdsGenerator.java:85) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:850) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10478) Kafka Producer wrongly formats "%" for transaction ID
[ https://issues.apache.org/jira/browse/FLINK-10478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-10478: Assignee: vinoyang > Kafka Producer wrongly formats "%" for transaction ID > - > > Key: FLINK-10478 > URL: https://issues.apache.org/jira/browse/FLINK-10478 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.2 > Environment: Flink 1.4.2 > Scala 2.11.12 > jdk1.8.0_162 > Running on local embedded Flink mini cluster (This happened on a standalone > cluster with another code) >Reporter: Obi Tetsuya >Assignee: vinoyang >Priority: Minor > > Kafka Producer with exactly-once feature uses its task name for a transaction > ID. Because the Producer uses the name as a format string directly, in the > case it contains "%" the job fails. > Code to reproduce: > {code:scala} > object ExampleRunner { > def main(args: Array[String]): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) > env.enableCheckpointing(1000) > env.getConfig.disableSysoutLogging() > env.setRestartStrategy(RestartStrategies.noRestart) > val p = new java.util.Properties > Map("bootstrap.servers" -> "192.168.1.100:9092", "transaction.timeout.ms" > -> (10 * 60 * 1000).toString).foreach { case (k,v) => p.setProperty(k,v) } > env > .fromCollection(100 to 200) > .map(_.toString) > .addSink(new FlinkKafkaProducer011( > "test", > new KeyedSerializationSchemaWrapper(new SimpleStringSchema), > p, > Semantic.EXACTLY_ONCE)).name("100%") > env.execute() > } > } > {code} > Raised exception: > {code} > 2018-10-02 17:00:12.918 [Map -> Sink: 100% (1/8)] INFO > o.a.flink.runtime.taskmanager.Task - Map -> Sink: 100% (1/8) > (25190aeccdce738afdd00e9320903d7b) switched from RUNNING to FAILED. > java.util.MissingFormatWidthException: %-% > at java.util.Formatter$FormatSpecifier.checkText(Formatter.java:3040) > at java.util.Formatter$FormatSpecifier.(Formatter.java:2733) > at java.util.Formatter.parse(Formatter.java:2560) > at java.util.Formatter.format(Formatter.java:2501) > at java.util.Formatter.format(Formatter.java:2455) > at java.lang.String.format(String.java:2940) > at > org.apache.flink.streaming.connectors.kafka.internal.TransactionalIdsGenerator.generateTransactionalId(TransactionalIdsGenerator.java:91) > at > org.apache.flink.streaming.connectors.kafka.internal.TransactionalIdsGenerator.generateIdsToUse(TransactionalIdsGenerator.java:72) > at > org.apache.flink.streaming.connectors.kafka.internal.TransactionalIdsGenerator.generateIdsToAbort(TransactionalIdsGenerator.java:85) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:850) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)