Github user kaspersorensen commented on a diff in the pull request: https://github.com/apache/metamodel/pull/178#discussion_r164562943 --- Diff: kafka/src/test/java/org/apache/metamodel/kafka/KafkaDataContextTest.java --- @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.kafka; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.metamodel.DataContext; +import org.apache.metamodel.data.DataSet; +import org.easymock.Capture; +import org.easymock.EasyMock; +import org.easymock.EasyMockSupport; +import org.junit.Assert; +import org.junit.Test; + +public class KafkaDataContextTest extends EasyMockSupport { + + @Test + public void testGetSchemaInfo() { + final ConsumerAndProducerFactory consumerFactory = createMock(ConsumerAndProducerFactory.class); + + replayAll(); + + final Supplier<Collection<String>> topicSupplier = () -> Arrays.asList("foo", "bar"); + final DataContext dc = new KafkaDataContext<>(String.class, String.class, consumerFactory, topicSupplier); + + verifyAll(); + + Assert.assertEquals("[foo, bar]", dc.getDefaultSchema().getTableNames().toString()); + } + + @Test + public void testQueryWithoutOptimization() { + final ConsumerAndProducerFactory consumerFactory = createMock(ConsumerAndProducerFactory.class); + @SuppressWarnings("unchecked") + final Consumer<String, String> consumer = createMock(Consumer.class); + + EasyMock.expect(consumerFactory.createConsumer("myTopic", String.class, String.class)).andReturn(consumer); + + final List<PartitionInfo> partitionInfoList = new ArrayList<>(); + partitionInfoList.add(new PartitionInfo("myTopic", 0, null, null, null)); + partitionInfoList.add(new PartitionInfo("myTopic", 1, null, null, null)); + partitionInfoList.add(new PartitionInfo("myTopic", 2, null, null, null)); + partitionInfoList.add(new PartitionInfo("myTopic", 3, null, null, null)); + + EasyMock.expect(consumer.partitionsFor("myTopic")).andReturn(partitionInfoList); + + final Capture<Collection<TopicPartition>> assignmentCapture = new Capture<>(); + consumer.assign(EasyMock.capture(assignmentCapture)); + consumer.seekToBeginning(EasyMock.anyObject()); + + final ConsumerRecords<String, String> consumerRecords1 = createConsumerRecords(1, 0, 10); + final ConsumerRecords<String, String> consumerRecords2 = createConsumerRecords(2, 20, 10); + + EasyMock.expect(consumer.poll(1000)).andReturn(consumerRecords1); + EasyMock.expect(consumer.poll(1000)).andReturn(consumerRecords2); + EasyMock.expect(consumer.poll(1000)).andReturn(null); + + consumer.unsubscribe(); + consumer.close(); + + replayAll(); + + final Supplier<Collection<String>> topicSupplier = () -> Arrays.asList("myTopic"); + final DataContext dc = new KafkaDataContext<>(String.class, String.class, consumerFactory, topicSupplier); + + final DataSet dataSet = dc.query().from("myTopic").select("partition", "offset", "value").where("key").eq( + "key2").execute(); + Assert.assertTrue(dataSet.next()); + Assert.assertEquals("Row[values=[1, 2, value2]]", dataSet.getRow().toString()); --- End diff -- You're not wrong :-) Kafka does give a few guarantees - the main one being that within the same partition, the offsets will be continuously incrementing. So a bit of ordering can be guaranteed, but obviously not across multiple partitions. In this case I want to ensure that all my records are in deed there on the output. Do you think there is a nicer way that I can do that? I'm open for doing it differently, but this seems to me to be the most readable and easy way to do it.
---