본문 바로가기

IT/BigData, 머신러닝

kafka java consumer 샘플 코드

반응형
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package kafka.sw.example;
 
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
 
public class ConsumerExample {
    private static final String TOPIC = "test";
    private static final int NUM_THREADS = 30;
 
    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put("group.id""test-group");
        
        /* be careful about address, port !!!!!!!!!!!!!!!!!!!!!!*/
        
        props.put("zookeeper.connect",
                "192.168.0.132:2181,192.168.0.136:2181,192.168.0.134:2181");
        props.put("auto.commit.interval.ms""1000");
        
        ConsumerConfig consumerConfig = new ConsumerConfig(props);        
        ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig);
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(TOPIC, NUM_THREADS);
        
        
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(TOPIC);
        
        ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
        
        for (final KafkaStream<byte[], byte[]> stream : streams) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    for (MessageAndMetadata<byte[], byte[]> messageAndMetadata : stream) {
                        System.out.println(new String(messageAndMetadata.message()));
                    }
                }
            });
        }
        
        Thread.sleep(60000);
        
        consumer.shutdown();
        executor.shutdown();
        System.out.println("Consumer finish");
    }
 
}
 
cs

'IT > BigData, 머신러닝' 카테고리의 다른 글

주식 공부 PER  (0) 2020.05.05
데이터 스케이일링 변환 방법  (0) 2020.04.19
데이터 전처리(preprocessing) 주요 포인트  (0) 2020.04.13
kafka java producer 예제 코드  (0) 2018.03.22
Hive - Create Table & csv file  (0) 2014.06.17