[GitHub] [kafka] lct45 commented on a change in pull request #9253: KAFKA-10366 & KAFKA-9649: Implement KIP-659 to allow TimeWindowedDeserializer and TimeWindowedSerde to handle window size
lct45 commented on a change in pull request #9253: URL: https://github.com/apache/kafka/pull/9253#discussion_r567013228 ## File path: streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala ## @@ -41,8 +43,19 @@ object Serdes { implicit def Integer: Serde[Int] = JSerdes.Integer().asInstanceOf[Serde[Int]] implicit def JavaInteger: Serde[java.lang.Integer] = JSerdes.Integer() - implicit def timeWindowedSerde[T](implicit tSerde: Serde[T]): WindowedSerdes.TimeWindowedSerde[T] = -new WindowedSerdes.TimeWindowedSerde[T](tSerde) + implicit def timeWindowedSerde[T](implicit inner: Serde[T]): Serde[Windowed[T]] = +new JSerdes.WrapperSerde[Windowed[T]]( + new TimeWindowedSerializer[T](inner.serializer), + new TimeWindowedDeserializer[T](inner.deserializer) { +override def deserialize(topic: String, data: Array[Byte]): Windowed[T] = { Review comment: Hmm, maybe we don't need custom code? I did this implementation back in the fall before [KIP-616](https://cwiki.apache.org/confluence/display/KAFKA/KIP-616%3A+Rename+implicit+Serdes+instances+in+kafka-streams-scala) was merged. You know the scala better than I do, but I think you're right that the deprecated warning in the `TimeWindowedDeserializer` will be enough now that KIP-616 added the `deprecated` annotation to this class This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9253: KAFKA-10366 & KAFKA-9649: Implement KIP-659 to allow TimeWindowedDeserializer and TimeWindowedSerde to handle window size
lct45 commented on a change in pull request #9253: URL: https://github.com/apache/kafka/pull/9253#discussion_r566957547 ## File path: streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/SerdesUnitTest.scala ## @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.scala + +import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender +import org.apache.kafka.streams.kstream.WindowedSerdes.TimeWindowedSerde +import org.junit.Assert.assertFalse +import org.junit.Test + +class SerdesUnitTest { + + @Test + def shouldLogMessageWhenTimeWindowedSerdeIsUsed(): Unit = { + +Serdes.timeWindowedSerde(new TimeWindowedSerde[String]()) +val appender = LogCaptureAppender.createAndRegister() +val warning = appender.getMessages() +assertFalse("There should be a warning about TimeWindowedDeserializer", warning.isEmpty) Review comment: Cool, I'm going to go ahead and take the test out. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9253: KAFKA-10366 & KAFKA-9649: Implement KIP-659 to allow TimeWindowedDeserializer and TimeWindowedSerde to handle window size
lct45 commented on a change in pull request #9253: URL: https://github.com/apache/kafka/pull/9253#discussion_r566957204 ## File path: streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala ## @@ -41,8 +43,19 @@ object Serdes { implicit def Integer: Serde[Int] = JSerdes.Integer().asInstanceOf[Serde[Int]] implicit def JavaInteger: Serde[java.lang.Integer] = JSerdes.Integer() - implicit def timeWindowedSerde[T](implicit tSerde: Serde[T]): WindowedSerdes.TimeWindowedSerde[T] = -new WindowedSerdes.TimeWindowedSerde[T](tSerde) + implicit def timeWindowedSerde[T](implicit inner: Serde[T]): Serde[Windowed[T]] = +new JSerdes.WrapperSerde[Windowed[T]]( + new TimeWindowedSerializer[T](inner.serializer), + new TimeWindowedDeserializer[T](inner.deserializer) { +override def deserialize(topic: String, data: Array[Byte]): Windowed[T] = { Review comment: Yeah, I agree that we don't want to spam the logs. There is the `deprecated` annotation above as well - with that do we even need the log messaged for the `timeWindowedSerde`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9253: KAFKA-10366 & KAFKA-9649: Implement KIP-659 to allow TimeWindowedDeserializer and TimeWindowedSerde to handle window size
lct45 commented on a change in pull request #9253: URL: https://github.com/apache/kafka/pull/9253#discussion_r564906839 ## File path: streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/SerdesUnitTest.scala ## @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.scala + +import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender +import org.apache.kafka.streams.kstream.WindowedSerdes.TimeWindowedSerde +import org.junit.Assert.assertFalse +import org.junit.Test + +class SerdesUnitTest { + + @Test + def shouldLogMessageWhenTimeWindowedSerdeIsUsed(): Unit = { + +Serdes.timeWindowedSerde(new TimeWindowedSerde[String]()) +val appender = LogCaptureAppender.createAndRegister() +val warning = appender.getMessages() +assertFalse("There should be a warning about TimeWindowedDeserializer", warning.isEmpty) Review comment: This test also appears to fail locally since it uses the constructor we deprecate in this PR and from a quick search it doesn't look like scala suppresses these warnings as easily as Java. Any thoughts on if this test is vital / a way to make this work? cc @guozhangwang If I suppress warnings, the test fails (: thoughts on viability? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9253: KAFKA-10366 & KAFKA-9649: Implement KIP-659 to allow TimeWindowedDeserializer and TimeWindowedSerde to handle window size
lct45 commented on a change in pull request #9253: URL: https://github.com/apache/kafka/pull/9253#discussion_r564906839 ## File path: streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/SerdesUnitTest.scala ## @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.scala + +import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender +import org.apache.kafka.streams.kstream.WindowedSerdes.TimeWindowedSerde +import org.junit.Assert.assertFalse +import org.junit.Test + +class SerdesUnitTest { + + @Test + def shouldLogMessageWhenTimeWindowedSerdeIsUsed(): Unit = { + +Serdes.timeWindowedSerde(new TimeWindowedSerde[String]()) +val appender = LogCaptureAppender.createAndRegister() +val warning = appender.getMessages() +assertFalse("There should be a warning about TimeWindowedDeserializer", warning.isEmpty) Review comment: This test also appears to fail locally since it uses the constructor we deprecate in this PR and from a quick search it doesn't look like scala suppresses these warnings as easily as Java. Any thoughts on if this test is vital / a way to make this work? cc @guozhangwang This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9253: KAFKA-10366 & KAFKA-9649: Implement KIP-659 to allow TimeWindowedDeserializer and TimeWindowedSerde to handle window size
lct45 commented on a change in pull request #9253: URL: https://github.com/apache/kafka/pull/9253#discussion_r564881288 ## File path: streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/SerdesUnitTest.scala ## @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.scala + +import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender +import org.apache.kafka.streams.kstream.WindowedSerdes.TimeWindowedSerde +import org.junit.Assert.assertFalse +import org.junit.Test + +class SerdesUnitTest { + + @Test + def shouldLogMessageWhenTimeWindowedSerdeIsUsed(): Unit = { + +Serdes.timeWindowedSerde(new TimeWindowedSerde[String]()) +val appender = LogCaptureAppender.createAndRegister() +val warning = appender.getMessages() +assertFalse("There should be a warning about TimeWindowedDeserializer", warning.isEmpty) Review comment: Not sure if this is the most secure check - went through a few different iterations but got type issues with all of them except this one. If someone has a better idea, lmk This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9253: KAFKA-10366 & KAFKA-9649: Implement KIP-659 to allow TimeWindowedDeserializer and TimeWindowedSerde to handle window size
lct45 commented on a change in pull request #9253: URL: https://github.com/apache/kafka/pull/9253#discussion_r562709184 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializerTest.java ## @@ -56,4 +57,23 @@ public void testWindowedValueDeserializerNoArgConstructors() { assertNotNull("Inner deserializer should be not null", inner); assertTrue("Inner deserializer type should be ByteArrayDeserializer", inner instanceof ByteArrayDeserializer); } + +@Test +public void setWindowSizeThroughConfigs() { +props.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, "500"); +final TimeWindowedDeserializer deserializer = new TimeWindowedDeserializer<>(); +deserializer.configure(props, false); Review comment: Ah yeah, good catch This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9253: KAFKA-10366 & KAFKA-9649: Implement KIP-659 to allow TimeWindowedDeserializer and TimeWindowedSerde to handle window size
lct45 commented on a change in pull request #9253: URL: https://github.com/apache/kafka/pull/9253#discussion_r562699939 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java ## @@ -235,25 +235,27 @@ public void shouldReduceWindowed() throws Exception { .thenComparing(KeyValueTimestamp::value); windowedOutput.sort(comparator); -final long firstBatchWindow = firstBatchTimestamp / 500 * 500; -final long secondBatchWindow = secondBatchTimestamp / 500 * 500; +final long firstBatchWindowStart = firstBatchTimestamp / 500 * 500; +final long firstBatchWindowEnd = firstBatchWindowStart + 500; Review comment: These are time windows so it would be `windowSize`, but I didn't write this test, just updated it to fit with the updated deserializer. I can switch it out to `windowSize` if you think it would help with readability, WYDT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9253: KAFKA-10366 & KAFKA-9649: Implement KIP-659 to allow TimeWindowedDeserializer and TimeWindowedSerde to handle window size
lct45 commented on a change in pull request #9253: URL: https://github.com/apache/kafka/pull/9253#discussion_r562698730 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java ## @@ -327,7 +329,7 @@ public void shouldAggregateWindowed() throws Exception { startStreams(); final List, Integer>> windowedMessages = receiveMessagesWithTimestamp( -new TimeWindowedDeserializer<>(), Review comment: The window size has to be set either in the constructor or in the configs so if we use the generics we have to use the configs, but the general idea was to push users to do the constructor over the configs. Also in this scenario, I wanted to confirm that it would work fine without configs This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9253: KAFKA-10366 & KAFKA-9649: Implement KIP-659 to allow TimeWindowedDeserializer and TimeWindowedSerde to handle window size
lct45 commented on a change in pull request #9253: URL: https://github.com/apache/kafka/pull/9253#discussion_r562056000 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java ## @@ -61,6 +61,21 @@ public Long getWindowSize() { @SuppressWarnings("unchecked") @Override public void configure(final Map configs, final boolean isKey) { +//check if the config is set and the window size is already set from the constructor +final Long configWindowSize; +if (configs.get(StreamsConfig.WINDOW_SIZE_MS_CONFIG) instanceof String) { Review comment: The console consumer made this check necessary - if there's a simpler way to do this lmk ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java ## @@ -33,22 +33,22 @@ */ public class TimeWindowedDeserializer implements Deserializer> { -private final Long windowSize; +private Long windowSize; private boolean isChangelogTopic; private Deserializer inner; - + // Default constructor needed by Kafka public TimeWindowedDeserializer() { -this(null, Long.MAX_VALUE); +this(null, null); } -// TODO: fix this part as last bits of KAFKA-4468 +@Deprecated public TimeWindowedDeserializer(final Deserializer inner) { this(inner, Long.MAX_VALUE); Review comment: I thought about it but figured since it's deprecated anyway and we want to keep backwards compatibility I would leave it. WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9253: KAFKA-10366 & KAFKA-9649: Implement KIP-659 to allow TimeWindowedDeserializer and TimeWindowedSerde to handle window size
lct45 commented on a change in pull request #9253: URL: https://github.com/apache/kafka/pull/9253#discussion_r562156019 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java ## @@ -33,22 +33,22 @@ */ public class TimeWindowedDeserializer implements Deserializer> { -private final Long windowSize; +private Long windowSize; private boolean isChangelogTopic; private Deserializer inner; - + // Default constructor needed by Kafka public TimeWindowedDeserializer() { -this(null, Long.MAX_VALUE); +this(null, null); } -// TODO: fix this part as last bits of KAFKA-4468 +@Deprecated public TimeWindowedDeserializer(final Deserializer inner) { this(inner, Long.MAX_VALUE); Review comment: I thought about it but figured since it's deprecated anyway and we want to keep backwards compatibility I would leave it. WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9253: KAFKA-10366 & KAFKA-9649: Implement KIP-659 to allow TimeWindowedDeserializer and TimeWindowedSerde to handle window size
lct45 commented on a change in pull request #9253: URL: https://github.com/apache/kafka/pull/9253#discussion_r562056000 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java ## @@ -61,6 +61,21 @@ public Long getWindowSize() { @SuppressWarnings("unchecked") @Override public void configure(final Map configs, final boolean isKey) { +//check if the config is set and the window size is already set from the constructor +final Long configWindowSize; +if (configs.get(StreamsConfig.WINDOW_SIZE_MS_CONFIG) instanceof String) { Review comment: The console consumer made this check necessary - if there's a simpler way to do this lmk This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org