[GitHub] [kafka] lct45 commented on a change in pull request #9253: KAFKA-10366 & KAFKA-9649: Implement KIP-659 to allow TimeWindowedDeserializer and TimeWindowedSerde to handle window size

2021-01-29 Thread GitBox


lct45 commented on a change in pull request #9253:
URL: https://github.com/apache/kafka/pull/9253#discussion_r567013228



##
File path: 
streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala
##
@@ -41,8 +43,19 @@ object Serdes {
   implicit def Integer: Serde[Int] = JSerdes.Integer().asInstanceOf[Serde[Int]]
   implicit def JavaInteger: Serde[java.lang.Integer] = JSerdes.Integer()
 
-  implicit def timeWindowedSerde[T](implicit tSerde: Serde[T]): 
WindowedSerdes.TimeWindowedSerde[T] =
-new WindowedSerdes.TimeWindowedSerde[T](tSerde)
+  implicit def timeWindowedSerde[T](implicit inner: Serde[T]): 
Serde[Windowed[T]] =
+new JSerdes.WrapperSerde[Windowed[T]](
+  new TimeWindowedSerializer[T](inner.serializer),
+  new TimeWindowedDeserializer[T](inner.deserializer) {
+override def deserialize(topic: String, data: Array[Byte]): 
Windowed[T] = {

Review comment:
   Hmm, maybe we don't need custom code? I did this implementation back in 
the fall before 
[KIP-616](https://cwiki.apache.org/confluence/display/KAFKA/KIP-616%3A+Rename+implicit+Serdes+instances+in+kafka-streams-scala)
 was merged. You know the scala better than I do, but I think you're right that 
the deprecated warning in the `TimeWindowedDeserializer` will be enough now 
that KIP-616 added the `deprecated` annotation to this class





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9253: KAFKA-10366 & KAFKA-9649: Implement KIP-659 to allow TimeWindowedDeserializer and TimeWindowedSerde to handle window size

2021-01-29 Thread GitBox


lct45 commented on a change in pull request #9253:
URL: https://github.com/apache/kafka/pull/9253#discussion_r566957547



##
File path: 
streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/SerdesUnitTest.scala
##
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala
+
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender
+import org.apache.kafka.streams.kstream.WindowedSerdes.TimeWindowedSerde
+import org.junit.Assert.assertFalse
+import org.junit.Test
+
+class SerdesUnitTest {
+
+  @Test
+  def shouldLogMessageWhenTimeWindowedSerdeIsUsed(): Unit = {
+
+Serdes.timeWindowedSerde(new TimeWindowedSerde[String]())
+val appender = LogCaptureAppender.createAndRegister()
+val warning = appender.getMessages()
+assertFalse("There should be a warning about TimeWindowedDeserializer", 
warning.isEmpty)

Review comment:
   Cool, I'm going to go ahead and take the test out. Thanks!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9253: KAFKA-10366 & KAFKA-9649: Implement KIP-659 to allow TimeWindowedDeserializer and TimeWindowedSerde to handle window size

2021-01-29 Thread GitBox


lct45 commented on a change in pull request #9253:
URL: https://github.com/apache/kafka/pull/9253#discussion_r566957204



##
File path: 
streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala
##
@@ -41,8 +43,19 @@ object Serdes {
   implicit def Integer: Serde[Int] = JSerdes.Integer().asInstanceOf[Serde[Int]]
   implicit def JavaInteger: Serde[java.lang.Integer] = JSerdes.Integer()
 
-  implicit def timeWindowedSerde[T](implicit tSerde: Serde[T]): 
WindowedSerdes.TimeWindowedSerde[T] =
-new WindowedSerdes.TimeWindowedSerde[T](tSerde)
+  implicit def timeWindowedSerde[T](implicit inner: Serde[T]): 
Serde[Windowed[T]] =
+new JSerdes.WrapperSerde[Windowed[T]](
+  new TimeWindowedSerializer[T](inner.serializer),
+  new TimeWindowedDeserializer[T](inner.deserializer) {
+override def deserialize(topic: String, data: Array[Byte]): 
Windowed[T] = {

Review comment:
   Yeah, I agree that we don't want to spam the logs. There is the 
`deprecated` annotation above as well - with that do we even need the log 
messaged for the `timeWindowedSerde`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9253: KAFKA-10366 & KAFKA-9649: Implement KIP-659 to allow TimeWindowedDeserializer and TimeWindowedSerde to handle window size

2021-01-28 Thread GitBox


lct45 commented on a change in pull request #9253:
URL: https://github.com/apache/kafka/pull/9253#discussion_r564906839



##
File path: 
streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/SerdesUnitTest.scala
##
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala
+
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender
+import org.apache.kafka.streams.kstream.WindowedSerdes.TimeWindowedSerde
+import org.junit.Assert.assertFalse
+import org.junit.Test
+
+class SerdesUnitTest {
+
+  @Test
+  def shouldLogMessageWhenTimeWindowedSerdeIsUsed(): Unit = {
+
+Serdes.timeWindowedSerde(new TimeWindowedSerde[String]())
+val appender = LogCaptureAppender.createAndRegister()
+val warning = appender.getMessages()
+assertFalse("There should be a warning about TimeWindowedDeserializer", 
warning.isEmpty)

Review comment:
   This test also appears to fail locally since it uses the constructor we 
deprecate in this PR and from a quick search it doesn't look like scala 
suppresses these warnings as easily as Java. Any thoughts on if this test is 
vital / a way to make this work? cc @guozhangwang 
   
   If I suppress warnings, the test fails (: thoughts on viability?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9253: KAFKA-10366 & KAFKA-9649: Implement KIP-659 to allow TimeWindowedDeserializer and TimeWindowedSerde to handle window size

2021-01-26 Thread GitBox


lct45 commented on a change in pull request #9253:
URL: https://github.com/apache/kafka/pull/9253#discussion_r564906839



##
File path: 
streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/SerdesUnitTest.scala
##
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala
+
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender
+import org.apache.kafka.streams.kstream.WindowedSerdes.TimeWindowedSerde
+import org.junit.Assert.assertFalse
+import org.junit.Test
+
+class SerdesUnitTest {
+
+  @Test
+  def shouldLogMessageWhenTimeWindowedSerdeIsUsed(): Unit = {
+
+Serdes.timeWindowedSerde(new TimeWindowedSerde[String]())
+val appender = LogCaptureAppender.createAndRegister()
+val warning = appender.getMessages()
+assertFalse("There should be a warning about TimeWindowedDeserializer", 
warning.isEmpty)

Review comment:
   This test also appears to fail locally since it uses the constructor we 
deprecate in this PR and from a quick search it doesn't look like scala 
suppresses these warnings as easily as Java. Any thoughts on if this test is 
vital / a way to make this work? cc @guozhangwang 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9253: KAFKA-10366 & KAFKA-9649: Implement KIP-659 to allow TimeWindowedDeserializer and TimeWindowedSerde to handle window size

2021-01-26 Thread GitBox


lct45 commented on a change in pull request #9253:
URL: https://github.com/apache/kafka/pull/9253#discussion_r564881288



##
File path: 
streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/SerdesUnitTest.scala
##
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala
+
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender
+import org.apache.kafka.streams.kstream.WindowedSerdes.TimeWindowedSerde
+import org.junit.Assert.assertFalse
+import org.junit.Test
+
+class SerdesUnitTest {
+
+  @Test
+  def shouldLogMessageWhenTimeWindowedSerdeIsUsed(): Unit = {
+
+Serdes.timeWindowedSerde(new TimeWindowedSerde[String]())
+val appender = LogCaptureAppender.createAndRegister()
+val warning = appender.getMessages()
+assertFalse("There should be a warning about TimeWindowedDeserializer", 
warning.isEmpty)

Review comment:
   Not sure if this is the most secure check - went through a few different 
iterations but got type issues with all of them except this one. If someone has 
a better idea, lmk





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9253: KAFKA-10366 & KAFKA-9649: Implement KIP-659 to allow TimeWindowedDeserializer and TimeWindowedSerde to handle window size

2021-01-22 Thread GitBox


lct45 commented on a change in pull request #9253:
URL: https://github.com/apache/kafka/pull/9253#discussion_r562709184



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializerTest.java
##
@@ -56,4 +57,23 @@ public void testWindowedValueDeserializerNoArgConstructors() 
{
 assertNotNull("Inner deserializer should be not null", inner);
 assertTrue("Inner deserializer type should be ByteArrayDeserializer", 
inner instanceof ByteArrayDeserializer);
 }
+
+@Test
+public void setWindowSizeThroughConfigs() {
+props.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, "500");
+final TimeWindowedDeserializer deserializer = new 
TimeWindowedDeserializer<>();
+deserializer.configure(props, false);

Review comment:
   Ah yeah, good catch





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9253: KAFKA-10366 & KAFKA-9649: Implement KIP-659 to allow TimeWindowedDeserializer and TimeWindowedSerde to handle window size

2021-01-22 Thread GitBox


lct45 commented on a change in pull request #9253:
URL: https://github.com/apache/kafka/pull/9253#discussion_r562699939



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
##
@@ -235,25 +235,27 @@ public void shouldReduceWindowed() throws Exception {
 .thenComparing(KeyValueTimestamp::value);
 
 windowedOutput.sort(comparator);
-final long firstBatchWindow = firstBatchTimestamp / 500 * 500;
-final long secondBatchWindow = secondBatchTimestamp / 500 * 500;
+final long firstBatchWindowStart = firstBatchTimestamp / 500 * 500;
+final long firstBatchWindowEnd = firstBatchWindowStart + 500;

Review comment:
   These are time windows so it would be `windowSize`, but I didn't write 
this test, just updated it to fit with the updated deserializer. I can switch 
it out to `windowSize` if you think it would help with readability, WYDT?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9253: KAFKA-10366 & KAFKA-9649: Implement KIP-659 to allow TimeWindowedDeserializer and TimeWindowedSerde to handle window size

2021-01-22 Thread GitBox


lct45 commented on a change in pull request #9253:
URL: https://github.com/apache/kafka/pull/9253#discussion_r562698730



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
##
@@ -327,7 +329,7 @@ public void shouldAggregateWindowed() throws Exception {
 startStreams();
 
 final List, Integer>> 
windowedMessages = receiveMessagesWithTimestamp(
-new TimeWindowedDeserializer<>(),

Review comment:
   The window size has to be set either in the constructor or in the 
configs so if we use the generics we have to use the configs, but the general 
idea was to push users to do the constructor over the configs. Also in this 
scenario, I wanted to confirm that it would work fine without configs





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9253: KAFKA-10366 & KAFKA-9649: Implement KIP-659 to allow TimeWindowedDeserializer and TimeWindowedSerde to handle window size

2021-01-21 Thread GitBox


lct45 commented on a change in pull request #9253:
URL: https://github.com/apache/kafka/pull/9253#discussion_r562056000



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java
##
@@ -61,6 +61,21 @@ public Long getWindowSize() {
 @SuppressWarnings("unchecked")
 @Override
 public void configure(final Map configs, final boolean isKey) {
+//check if the config is set and the window size is already set from 
the constructor
+final Long configWindowSize;
+if (configs.get(StreamsConfig.WINDOW_SIZE_MS_CONFIG) instanceof 
String) {

Review comment:
   The console consumer made this check necessary - if there's a simpler 
way to do this lmk

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java
##
@@ -33,22 +33,22 @@
  */
 public class TimeWindowedDeserializer implements Deserializer> {
 
-private final Long windowSize;
+private Long windowSize;
 private boolean isChangelogTopic;
 
 private Deserializer inner;
-
+
 // Default constructor needed by Kafka
 public TimeWindowedDeserializer() {
-this(null, Long.MAX_VALUE);
+this(null, null);
 }
 
-// TODO: fix this part as last bits of KAFKA-4468
+@Deprecated
 public TimeWindowedDeserializer(final Deserializer inner) {
 this(inner, Long.MAX_VALUE);

Review comment:
   I thought about it but figured since it's deprecated anyway and we want 
to keep backwards compatibility I would leave it. WDYT?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9253: KAFKA-10366 & KAFKA-9649: Implement KIP-659 to allow TimeWindowedDeserializer and TimeWindowedSerde to handle window size

2021-01-21 Thread GitBox


lct45 commented on a change in pull request #9253:
URL: https://github.com/apache/kafka/pull/9253#discussion_r562156019



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java
##
@@ -33,22 +33,22 @@
  */
 public class TimeWindowedDeserializer implements Deserializer> {
 
-private final Long windowSize;
+private Long windowSize;
 private boolean isChangelogTopic;
 
 private Deserializer inner;
-
+
 // Default constructor needed by Kafka
 public TimeWindowedDeserializer() {
-this(null, Long.MAX_VALUE);
+this(null, null);
 }
 
-// TODO: fix this part as last bits of KAFKA-4468
+@Deprecated
 public TimeWindowedDeserializer(final Deserializer inner) {
 this(inner, Long.MAX_VALUE);

Review comment:
   I thought about it but figured since it's deprecated anyway and we want 
to keep backwards compatibility I would leave it. WDYT?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9253: KAFKA-10366 & KAFKA-9649: Implement KIP-659 to allow TimeWindowedDeserializer and TimeWindowedSerde to handle window size

2021-01-21 Thread GitBox


lct45 commented on a change in pull request #9253:
URL: https://github.com/apache/kafka/pull/9253#discussion_r562056000



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java
##
@@ -61,6 +61,21 @@ public Long getWindowSize() {
 @SuppressWarnings("unchecked")
 @Override
 public void configure(final Map configs, final boolean isKey) {
+//check if the config is set and the window size is already set from 
the constructor
+final Long configWindowSize;
+if (configs.get(StreamsConfig.WINDOW_SIZE_MS_CONFIG) instanceof 
String) {

Review comment:
   The console consumer made this check necessary - if there's a simpler 
way to do this lmk





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org