Re: [PR] KAFKA-15143: Adding in MockFixedKeyProcessorContext and Test [kafka]
mjsax commented on PR #14605: URL: https://github.com/apache/kafka/pull/14605#issuecomment-1958253922 That weird -- signup should just work. (You definitely don't need to file an ICLA or become a committer :)) -- Maybe it's a temporary glitch -- or something is broken and we need to file a ticket to get it fixed by the infra team (the wiki is for all Apache projects, not just for Kafka). Can you maybe send an email infrastruct...@apache.org as suggested and ask? Feel free to add me as cc (mj...@apache.org). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15143: Adding in MockFixedKeyProcessorContext and Test [kafka]
s7pandey commented on PR #14605: URL: https://github.com/apache/kafka/pull/14605#issuecomment-1956760536 Hey @mjsax I seem unable to gain access to Confluence, following the instructions on the KIP page leads me to a page where I cannot sign up, would I need to follow: https://infra.apache.org/new-committers-guide.html#becoming-a-committer ? If so, I can submit the ICLA -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15143: Adding in MockFixedKeyProcessorContext and Test [kafka]
lihaosky commented on code in PR #14605: URL: https://github.com/apache/kafka/pull/14605#discussion_r1422983101 ## streams/test-utils/src/test/java/org/apache/kafka/streams/test/MockFixedKeyProcessorContextTest.java: ## @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.test; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.Punctuator; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.api.FixedKeyProcessor; +import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext; +import org.apache.kafka.streams.processor.api.FixedKeyRecord; +import org.apache.kafka.streams.processor.api.InternalFixedKeyRecordFactory; +import org.apache.kafka.streams.processor.api.MockFixedKeyProcessorContext; +import org.apache.kafka.streams.processor.api.MockProcessorContext.CapturedForward; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.api.RecordMetadata; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import java.util.Properties; + +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkProperties; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.is; + +public class MockFixedKeyProcessorContextTest { Review Comment: Alternatively, you can do something like https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java#L133-L147. This can give you a global param to distinguish them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15143: Adding in MockFixedKeyProcessorContext and Test [kafka]
s7pandey commented on code in PR #14605: URL: https://github.com/apache/kafka/pull/14605#discussion_r1408117794 ## streams/test-utils/src/test/java/org/apache/kafka/streams/test/MockFixedKeyProcessorContextTest.java: ## @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.test; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.Punctuator; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.api.FixedKeyProcessor; +import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext; +import org.apache.kafka.streams.processor.api.FixedKeyRecord; +import org.apache.kafka.streams.processor.api.InternalFixedKeyRecordFactory; +import org.apache.kafka.streams.processor.api.MockFixedKeyProcessorContext; +import org.apache.kafka.streams.processor.api.MockProcessorContext.CapturedForward; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.api.RecordMetadata; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import java.util.Properties; + +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkProperties; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.is; + +public class MockFixedKeyProcessorContextTest { Review Comment: Sounds good, just to make sure I'm on the right track, here is a code snippet ``` @ParameterizedTest(name = "processor = {0}, context = {1}") @MethodSource(value = "parameters") @SuppressWarnings("unchecked") public void shouldStoreAndReturnStateStores(Object processorObject, ProcessingContext context) { Record input1 = new Record<>("foo", 5L, 0L); Record input2 = new Record<>("bar", 50L, 0L); final StoreBuilder> storeBuilder = Stores.keyValueStoreBuilder( Stores.inMemoryKeyValueStore("my-state"), Serdes.String(), Serdes.Long()).withLoggingDisabled(); final KeyValueStore store = storeBuilder.build(); if(processorObject instanceof Processor && context instanceof MockProcessorContext){ Processor processor = ((Processor) processorObject); MockProcessorContext mockProcessorContext = ((MockProcessorContext) context); store.init(mockProcessorContext.getStateStoreContext(), store); processor.init(mockProcessorContext); processor.process(input1); processor.process(input2); } else if(processorObject instanceof FixedKeyProcessor && context instanceof MockFixedKeyProcessorContext){ FixedKeyProcessor processor = ((FixedKeyProcessor) processorObject); MockFixedKeyProcessorContext mockFixedKeyProcessorContext = ((MockFixedKeyProcessorContext) context); store.init(mockFixedKeyProcessorContext.getStateStoreContext(), store); processor.init(mockFixedKeyProcessorContext); processor.process(InternalFixedKeyRecordFactory.create(input1)); processor.process(InternalFixedKeyRecordFactory.create(input2)); } else{ assert false; } assertThat(store.get("foo"), is(5L)); assertThat(store.get("bar"), is(50L)); assertThat(store.get("all"), is(55L)); } ``` Since `FixedKeyProcessor` and `Processor` are not related to one another, this would be a
Re: [PR] KAFKA-15143: Adding in MockFixedKeyProcessorContext and Test [kafka]
s7pandey commented on code in PR #14605: URL: https://github.com/apache/kafka/pull/14605#discussion_r1408117794 ## streams/test-utils/src/test/java/org/apache/kafka/streams/test/MockFixedKeyProcessorContextTest.java: ## @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.test; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.Punctuator; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.api.FixedKeyProcessor; +import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext; +import org.apache.kafka.streams.processor.api.FixedKeyRecord; +import org.apache.kafka.streams.processor.api.InternalFixedKeyRecordFactory; +import org.apache.kafka.streams.processor.api.MockFixedKeyProcessorContext; +import org.apache.kafka.streams.processor.api.MockProcessorContext.CapturedForward; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.api.RecordMetadata; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import java.util.Properties; + +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkProperties; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.is; + +public class MockFixedKeyProcessorContextTest { Review Comment: Sounds good, just to make sure I'm on the right track, here is a code snippet ``` @ParameterizedTest(name = "processor = {0}, context = {1}") @MethodSource(value = "parameters") @SuppressWarnings("unchecked") public void shouldStoreAndReturnStateStores(Object processorObject, ProcessingContext context) { Record input1 = new Record<>("foo", 5L, 0L); Record input2 = new Record<>("bar", 50L, 0L); final StoreBuilder> storeBuilder = Stores.keyValueStoreBuilder( Stores.inMemoryKeyValueStore("my-state"), Serdes.String(), Serdes.Long()).withLoggingDisabled(); final KeyValueStore store = storeBuilder.build(); if(processorObject instanceof Processor && context instanceof MockProcessorContext){ Processor processor = ((Processor) processorObject); MockProcessorContext mockProcessorContext = ((MockProcessorContext) context); store.init(mockProcessorContext.getStateStoreContext(), store); processor.init(mockProcessorContext); processor.process(input1); processor.process(input2); } else if(processorObject instanceof FixedKeyProcessor && context instanceof MockFixedKeyProcessorContext){ FixedKeyProcessor processor = ((FixedKeyProcessor) processorObject); MockFixedKeyProcessorContext mockFixedKeyProcessorContext = ((MockFixedKeyProcessorContext) context); store.init(mockFixedKeyProcessorContext.getStateStoreContext(), store); processor.init(mockFixedKeyProcessorContext); processor.process(InternalFixedKeyRecordFactory.create(input1)); processor.process(InternalFixedKeyRecordFactory.create(input2)); } else{ assert false; } ``` Since `FixedKeyProcessor` and `Processor` are not related to one another, this would be a way to pass in both processors to one test. -- This is an automated message from the Apache Git Service. To respond to the
Re: [PR] KAFKA-15143: Adding in MockFixedKeyProcessorContext and Test [kafka]
lihaosky commented on code in PR #14605: URL: https://github.com/apache/kafka/pull/14605#discussion_r1407319712 ## streams/test-utils/src/test/java/org/apache/kafka/streams/test/MockFixedKeyProcessorContextTest.java: ## @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.test; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.Punctuator; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.api.FixedKeyProcessor; +import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext; +import org.apache.kafka.streams.processor.api.FixedKeyRecord; +import org.apache.kafka.streams.processor.api.InternalFixedKeyRecordFactory; +import org.apache.kafka.streams.processor.api.MockFixedKeyProcessorContext; +import org.apache.kafka.streams.processor.api.MockProcessorContext.CapturedForward; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.api.RecordMetadata; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import java.util.Properties; + +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkProperties; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.is; + +public class MockFixedKeyProcessorContextTest { Review Comment: Right. I think you can parameterize `MockProcessorContextAPITest` so common tests run for both processor context mock and add some tests for `FixedKeyProcessor` only (return directly for other processor context) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15143: Adding in MockFixedKeyProcessorContext and Test [kafka]
s7pandey commented on code in PR #14605: URL: https://github.com/apache/kafka/pull/14605#discussion_r1404742800 ## streams/test-utils/src/test/java/org/apache/kafka/streams/test/MockFixedKeyProcessorContextTest.java: ## @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.test; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.Punctuator; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.api.FixedKeyProcessor; +import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext; +import org.apache.kafka.streams.processor.api.FixedKeyRecord; +import org.apache.kafka.streams.processor.api.InternalFixedKeyRecordFactory; +import org.apache.kafka.streams.processor.api.MockFixedKeyProcessorContext; +import org.apache.kafka.streams.processor.api.MockProcessorContext.CapturedForward; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.api.RecordMetadata; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import java.util.Properties; + +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkProperties; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.is; + +public class MockFixedKeyProcessorContextTest { Review Comment: Definitely! Do you think I should just add some specific FixedKeyProcessor tests to the MockProcessorContextAPITest file since it is mostly the same? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15143: Adding in MockFixedKeyProcessorContext and Test [kafka]
lihaosky commented on code in PR #14605: URL: https://github.com/apache/kafka/pull/14605#discussion_r1399755568 ## streams/test-utils/src/test/java/org/apache/kafka/streams/test/MockFixedKeyProcessorContextTest.java: ## @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.test; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.Punctuator; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.api.FixedKeyProcessor; +import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext; +import org.apache.kafka.streams.processor.api.FixedKeyRecord; +import org.apache.kafka.streams.processor.api.InternalFixedKeyRecordFactory; +import org.apache.kafka.streams.processor.api.MockFixedKeyProcessorContext; +import org.apache.kafka.streams.processor.api.MockProcessorContext.CapturedForward; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.api.RecordMetadata; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import java.util.Properties; + +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkProperties; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.is; + +public class MockFixedKeyProcessorContextTest { Review Comment: Can you parameterize `MockProcessorContextAPITest`? Most of the code here seems to be the same as what's in `MockProcessorContextAPITest`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15143: Adding in MockFixedKeyProcessorContext and Test [kafka]
s7pandey opened a new pull request, #14605: URL: https://github.com/apache/kafka/pull/14605 This change is to introduce the MockFixedKeyProcessorContext, which extends the existing `MockProcessorContext`, and is essential when trying to test a `FixedKeyProcessor` in Unit Tests Testing includes the `MockFixedKeyProcessorContextTest` which brings in the tests from `MockProcessorContextAPITest` however, adhering to fixed key processing (in every test, the key does not change) ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org