반응형
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 | package kafka.sw.example; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; public class ConsumerExample { private static final String TOPIC = "test"; private static final int NUM_THREADS = 30; public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put("group.id", "test-group"); /* be careful about address, port !!!!!!!!!!!!!!!!!!!!!!*/ props.put("zookeeper.connect", "192.168.0.132:2181,192.168.0.136:2181,192.168.0.134:2181"); props.put("auto.commit.interval.ms", "1000"); ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(TOPIC, NUM_THREADS); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(TOPIC); ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS); for (final KafkaStream<byte[], byte[]> stream : streams) { executor.execute(new Runnable() { @Override public void run() { for (MessageAndMetadata<byte[], byte[]> messageAndMetadata : stream) { System.out.println(new String(messageAndMetadata.message())); } } }); } Thread.sleep(60000); consumer.shutdown(); executor.shutdown(); System.out.println("Consumer finish"); } } | cs |
'IT > BigData, 머신러닝' 카테고리의 다른 글
주식 공부 PER (0) | 2020.05.05 |
---|---|
데이터 스케이일링 변환 방법 (0) | 2020.04.19 |
데이터 전처리(preprocessing) 주요 포인트 (0) | 2020.04.13 |
kafka java producer 예제 코드 (0) | 2018.03.22 |
Hive - Create Table & csv file (0) | 2014.06.17 |