Hi Tim,

afaik we are confusing two things here, there is a transaction timeout =
how long the transaction lasts until aborted. And what you see here is some
timeout while creating the transaction in the first place.

A quick google search turned up [1], from which I'd infer that you need to
set TRANSACTIONAL_ID_CONFIG.

[1] https://github.com/testcontainers/testcontainers-java/issues/1816

On Thu, Nov 12, 2020 at 3:48 PM Tim Josefsson <tim.josefs...@webstep.se>
wrote:

> To further add to this problem, I've now got our ops team to set
> transaction.max.timeout.ms on our Kafka brokers to 1 hour (as suggested
> by the Flink docs). However the problem persists and I'm still getting
> the same error message.
> I've confirmed that this config setting is actually set on the Kafka
> brokers as well to rule out any mistakes there.
>
> Best,
> Tim
>
> On Thu, 12 Nov 2020 at 14:46, Tim Josefsson <tim.josefs...@webstep.se>
> wrote:
>
>> Also realized I had a typo in the config dump I did in the previous email
>> (the one from the 10th). If I don't do
>> Properties producerProps = new Properties();
>> producerProps.setProperty("transaction.timeout.ms", "900000");
>> Then the value reported from the ProducerConfig is 3600000 and not 60000
>> as I had written.
>>
>>
>>
>> On Thu, 12 Nov 2020 at 13:37, Tim Josefsson <tim.josefs...@webstep.se>
>> wrote:
>>
>>> Sure, I've attached it to this email. The process seems to restart once
>>> the TimeoutException happens so it's repeated a couple of times.
>>>
>>> Thanks for looking at it!
>>>
>>> /Tim
>>>
>>> On Wed, 11 Nov 2020 at 10:37, Aljoscha Krettek <aljos...@apache.org>
>>> wrote:
>>>
>>>> Hmm, could you please post the full stack trace that leads to the
>>>> TimeoutException?
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>> On 10.11.20 17:54, Tim Josefsson wrote:
>>>> > Hey Aljoscha,
>>>> >
>>>> > I'm setting the transaction.timeout.ms when I create the
>>>> FlinkKafkaProducer:
>>>> >
>>>> > I create a Properties object and then set the property and finally add
>>>> > those properties when creating the producer.
>>>> >
>>>> > Properties producerProps = new Properties();
>>>> > producerProps.setProperty("transaction.timeout.ms", "900000");
>>>> >
>>>> > If I don't set that property my I instead get the following config
>>>> when
>>>> > starting the job:
>>>> > 11:41:56,345 INFO  org.apache.kafka.clients.producer.ProducerConfig
>>>> >       - ProducerConfig values:
>>>> >     acks = 1
>>>> >     [omitted for brevity]
>>>> >     transaction.timeout.ms = 60000
>>>> >     transactional.id = Source: Read player events from Kafka -> Map
>>>> >   Json to HashMap -> Add CanonicalTime as timestamp -> Filter dates
>>>> not
>>>> > needed for backfill -> Sink: Post events to playerEvents
>>>> > Kafka-a15b4dd4812495cebdc94e33125ef858-1
>>>> >     value.serializer = class
>>>> > org.apache.kafka.common.serialization.ByteArraySerializer
>>>> >
>>>> > So I imagine the Producer is picking up the change but it still
>>>> returns
>>>> > errors when running the job.
>>>> >
>>>> > Best regards,
>>>> > Tim
>>>> >
>>>> >
>>>> > On Tue, 10 Nov 2020 at 16:14, Aljoscha Krettek <aljos...@apache.org>
>>>> wrote:
>>>> >
>>>> >> On 10.11.20 11:53, Tim Josefsson wrote:
>>>> >>> Also when checking my logs I see the following message:
>>>> >>> 11:41:56,345 INFO  org.apache.kafka.clients.producer.ProducerConfig
>>>> >>>        - ProducerConfig values:
>>>> >>>      acks = 1
>>>> >>>      [omitted for brevity]
>>>> >>>      transaction.timeout.ms = 900000
>>>> >>>      transactional.id = Source: Read player events from Kafka ->
>>>> Map
>>>> >>>    Json to HashMap -> Add CanonicalTime as timestamp -> Filter
>>>> dates not
>>>> >>> needed for backfill -> Sink: Post events to playerEvents
>>>> >>> Kafka-a15b4dd4812495cebdc94e33125ef858-1
>>>> >>>      value.serializer = class
>>>> >>> org.apache.kafka.common.serialization.ByteArraySerializer
>>>> >>
>>>> >> The interesting thing would be to figure out where that
>>>> >> `transaction.timeout.ms = 900000` is coming from. The default from
>>>> Flink
>>>> >> would be 60000, if nothing is configured. Are you specifying that
>>>> value,
>>>> >> maybe from the commandline or in code?
>>>> >>
>>>> >> Maybe it's a funny coincidence, but our StreamingKafkaITCase also
>>>> >> specifies that timeout value.
>>>> >>
>>>> >> Best,
>>>> >> Aljoscha
>>>> >>
>>>> >>
>>>> >
>>>>
>>>>
>>>
>>> --
>>>
>>> *Tim Josefsson*
>>> [image: Webstep GPtW] <http://www.webstep.se/>
>>> mobil   +46 (0) 707 81 91 12
>>> telefon +46 (0) 8 21 40 70
>>>
>>> tim.josefs...@webstep.se
>>> *webstep.se <http://www.webstep.se/>*
>>> Suttungs gränd 2
>>> 753 19 Uppsala
>>> Stockholm | Uppsala | Malmö | Sundsvall | Oslo
>>> Bergen | Stavanger | Trondheim | Kristiansand
>>> [image: LinkedIn] <http://www.linkedin.com/company/webstep-ab> [image:
>>> Facebook] <http://www.facebook.com/webstepAB> [image: Facebook]
>>> <http://www.instagram.com/webstep_sverige>
>>>
>>
>>
>> --
>>
>> *Tim Josefsson*
>> [image: Webstep GPtW] <http://www.webstep.se/>
>> mobil   +46 (0) 707 81 91 12
>> telefon +46 (0) 8 21 40 70
>>
>> tim.josefs...@webstep.se
>> *webstep.se <http://www.webstep.se/>*
>> Suttungs gränd 2
>> 753 19 Uppsala
>> Stockholm | Uppsala | Malmö | Sundsvall | Oslo
>> Bergen | Stavanger | Trondheim | Kristiansand
>> [image: LinkedIn] <http://www.linkedin.com/company/webstep-ab> [image:
>> Facebook] <http://www.facebook.com/webstepAB> [image: Facebook]
>> <http://www.instagram.com/webstep_sverige>
>>
>
>
> --
>
> *Tim Josefsson*
> [image: Webstep GPtW] <http://www.webstep.se/>
> mobil   +46 (0) 707 81 91 12
> telefon +46 (0) 8 21 40 70
>
> tim.josefs...@webstep.se
> *webstep.se <http://www.webstep.se/>*
> Suttungs gränd 2
> 753 19 Uppsala
> Stockholm | Uppsala | Malmö | Sundsvall | Oslo
> Bergen | Stavanger | Trondheim | Kristiansand
> [image: LinkedIn] <http://www.linkedin.com/company/webstep-ab> [image:
> Facebook] <http://www.facebook.com/webstepAB> [image: Facebook]
> <http://www.instagram.com/webstep_sverige>
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to