Apache Kafka – простой пример для продюсера

Давайте создадим приложение для публикации и использования сообщений с помощью клиента Java. Клиент производителя Kafka состоит из следующих API.

API КафкаПроизводитель

Давайте разберемся с наиболее важным набором API производителей Kafka в этом разделе. Центральная часть API KafkaProducer – класс KafkaProducer . Класс KafkaProducer предоставляет возможность подключить брокер Kafka в своем конструкторе с помощью следующих методов.

  • Класс KafkaProducer предоставляет метод send для асинхронной отправки сообщений в тему. Подпись send () выглядит следующим образом
producer.send(new ProducerRecord<byte[],byte[]>(topic, 
partition, key1, value1) , callback);
  • ProducerRecord – производитель управляет буфером записей, ожидающих отправки.
  • Обратный вызов – предоставленный пользователем обратный вызов для выполнения, когда запись была подтверждена сервером (ноль означает отсутствие обратного вызова).
  • Класс KafkaProducer предоставляет метод сброса, обеспечивающий фактическое завершение всех ранее отправленных сообщений. Синтаксис метода очистки следующий:
public void flush()
  • Класс KafkaProducer предоставляет метод partitionFor, который помогает в получении метаданных раздела для данной темы. Это может быть использовано для пользовательского разбиения. Суть этого метода заключается в следующем –
public Map metrics()

Возвращает карту внутренних метрик, поддерживаемых производителем.

  • public void close () – класс KafkaProducer предоставляет блоки методов close, пока все ранее отправленные запросы не будут выполнены.

API производителя

Центральной частью API производителя является класс Producer . Класс Producer предоставляет возможность подключить брокер Kafka в своем конструкторе следующими методами.

Класс продюсера

Класс продюсера предоставляет метод send для отправки сообщений в одну или несколько тем с использованием следующих подписей.

public void send(KeyedMessaget<k,v> message) 
- sends the data to a single topic,par-titioned by key using either sync or async producer.
public void send(List<KeyedMessage<k,v>>messages)
- sends data to multiple topics.
Properties prop = new Properties();
prop.put(producer.type,”async”)
ProducerConfig config = new ProducerConfig(prop);

Существует два типа производителей – Sync и Async .

Та же конфигурация API применима и к производителю Sync . Разница между ними заключается в том, что производитель синхронизации отправляет сообщения напрямую, но отправляет сообщения в фоновом режиме. Асинхронный производитель предпочтителен, когда вы хотите более высокую пропускную способность. В предыдущих выпусках, таких как 0.8, у асинхронного производителя нет обратного вызова для send () для регистрации обработчиков ошибок. Это доступно только в текущей версии 0.9.

public void close ()

Класс Producer предоставляет метод close для закрытия соединений пула производителей со всеми брокерами Kafka.

Настройки конфигурации

Основные параметры конфигурации API производителя приведены в следующей таблице для лучшего понимания –

S.NoНастройки конфигурации и описание
1client.id
определяет приложение производителя
2producer.type
синхронизация или асинхронность
3acks
Конфигурация acks контролирует критерии по запросам производителя.
4retries
Если запрос производителя не удался, автоматически повторите попытку с указанным значением.
5
bootstrap.servers

загрузочный список брокеров.
6linger.ms
если вы хотите уменьшить количество запросов, вы можете установить для linger.ms нечто большее, чем какое-либо значение.
7key.serializer
Ключ для интерфейса сериализатора.
8value.serializer
значение для интерфейса сериализатора.
9batch.size
Размер буфера.
10buffer.memory
контролирует общий объем памяти, доступной производителю для буферизации.

ProducerRecord API

ProducerRecord – это пара ключ / значение, которая отправляется в кластер Kafka. Конструктор классаProducerRecord для создания записи с парами разделов, ключей и значений с использованием следующей подписи.

public ProducerRecord (string topic, int partition, k key, v value)
  • Topic – определенное пользователем имя темы, которое будет добавлено в запись.
  • Partition – количество разделов
  • Key – ключ, который будет включен в запись.
  • Value – запись содержимого
public ProducerRecord (string topic, k key, v value)

Конструктор класса ProducerRecord используется для создания записи с ключом, парами значений и без разделения.

  • Тема – Создать тему для назначения записи.
  • Key – ключ для записи.
  • Value – содержание записи.
public ProducerRecord (string topic, v value)

Класс ProducerRecord создает запись без раздела и ключа.

  • Topic – создать тему.
  • Value – содержание записи.

Методы класса ProducerRecord перечислены в следующей таблице:

S.NoМетоды класса и описание
1public string topic()
Тема будет добавлена ​​в запись.
2public K key()
Ключ, который будет включен в запись. Если такой клавиши нет, значение null будет здесь снова.
3public V value()
Записать содержимое.
4partition()
Количество разделов для записи

Приложение SimpleProducer

Перед созданием приложения сначала запустите ZooKeeper и Kafka broker, затем создайте свою собственную тему в Kafka broker, используя команду create topic. После этого создайте класс Java с именем Sim-pleProducer.java и введите следующую кодировку.

//import util.properties packages
import java.util.Properties;

//import simple producer packages
import org.apache.kafka.clients.producer.Producer;

//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;

//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;

//Create java class named “SimpleProducer”
public class SimpleProducer {
   
   public static void main(String[] args) throws Exception{
      
      // Check arguments length value
      if(args.length == 0){
         System.out.println("Enter topic name”);
         return;
      }
      
      //Assign topicName to string variable
      String topicName = args[0].toString();
      
      // create instance for properties to access producer configs   
      Properties props = new Properties();
      
      //Assign localhost id
      props.put("bootstrap.servers", “localhost:9092");
      
      //Set acknowledgements for producer requests.      
      props.put("acks", “all");
      
      //If the request fails, the producer can automatically retry,
      props.put("retries", 0);
      
      //Specify buffer size in config
      props.put("batch.size", 16384);
      
      //Reduce the no of requests less than 0   
      props.put("linger.ms", 1);
      
      //The buffer.memory controls the total amount of memory available to the producer for buffering.   
      props.put("buffer.memory", 33554432);
      
      props.put("key.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
         
      props.put("value.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
      
      Producer<String, String> producer = new KafkaProducer
         <String, String>(props);
            
      for(int i = 0; i < 10; i++)
         producer.send(new ProducerRecord<String, String>(topicName, 
            Integer.toString(i), Integer.toString(i)));
               System.out.println(“Message sent successfully”);
               producer.close();
   }
}

Компиляция – приложение может быть скомпилировано с помощью следующей команды.

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

Выполнение – приложение может быть выполнено с помощью следующей команды.

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleProducer <topic-name>

Вывод

Message sent successfully
To check the above output open new terminal and type Consumer CLI command to receive messages.
>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning
1
2
3
4
5
6
7
8
9
10

Простой потребительский пример

На данный момент мы создали производителя для отправки сообщений в кластер Kafka. Теперь давайте создадим потребителя для потребления сообщений из кластера Kafka. API KafkaConsumer используется для приема сообщений из кластера Kafka. Конструктор класса KafkaConsumer определен ниже.

public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)

configs – вернуть карту пользовательских конфигов.

Класс KafkaConsumer имеет следующие важные методы, которые перечислены в таблице ниже.

S.NoМетод и описание
1public java.util.Set<TopicPar-tition> assignment()
Получить набор разделов, назначенных в данный момент потребителем.
2public string subscription()
Подпишитесь на данный список тем, чтобы получить динамически подписанные разделы.
3public void sub-scribe(java.util.List<java.lang.String> topics, ConsumerRe-balanceListener listener)
Подпишитесь на данный список тем, чтобы получить динамически подписанные разделы.
4public void unsubscribe()
Отписаться от тем из данного списка разделов.
5public void sub-scribe(java.util.List<java.lang.String> topics)
Подпишитесь на данный список тем, чтобы получить динамически подписанные разделы. Если данный список тем пуст, он обрабатывается так же, как и unsubscribe ().
6public void sub-scribe(java.util.regex.Pattern pattern, ConsumerRebalanceLis-tener listener)
Шаблон аргумента относится к шаблону подписки в формате регулярного выражения, а аргумент слушателя получает уведомления из шаблона подписки.
7public void as-sign(java.util.List<TopicParti-tion> partitions)
)
Вручную назначьте список разделов клиенту.
8poll()
Получить данные для тем или разделов, указанных с помощью одного из API подписки / назначения. Это вернет ошибку, если темы не подписаны перед опросом данных.
9public void commitSync()
Смещение коммитов, возвращаемое в последнем опросе () для всех подписанных списков тем и разделов. Та же операция применяется к commitAsyn ().
10public void seek(TopicPartition partition, long offset)
Получите текущее значение смещения, которое потребитель будет использовать в следующем методе poll ().
11public void resume()
Возобновить приостановленные разделы.
12public void wakeup()
Пробуждение потребителя.

ConsumerRecord API

API ConsumerRecord используется для получения записей из кластера Kafka. Этот API состоит из имени темы, номера раздела, из которого принимается запись, и смещения, которое указывает на запись в разделе Kafka. Класс ConsumerRecord используется для создания записи потребителя с определенным именем темы, количеством разделов и парами <ключ, значение>. Имеет следующую подпись.

public ConsumerRecord(string topic,int partition, long offset,K key, V value)
  • Тема – Название темы для записи потребителя, полученной из кластера Kafka.
  • Раздел – Раздел по теме.
  • Ключ – ключ записи, если ключа не существует, возвращается ноль.
  • Значение – запись содержимого.

ConsumerRecords API

ConsumerRecords API действует как контейнер для ConsumerRecord. Этот API используется для хранения списка ConsumerRecord для каждого раздела для определенной темы. Его конструктор определен ниже.

public ConsumerRecords(java.util.Map<TopicPartition,java.util.List
<Consumer-Record>K,V>>> records)
  • TopicPartition – возвращает карту раздела для определенной темы.
  • Записи – возврат списка ConsumerRecord.

В классе ConsumerRecords определены следующие методы.

S.NoМетоды и описание
1public int count()
Количество записей по всем темам.
2public Set partitions()
Набор разделов с данными в этом наборе записей (если данные не были возвращены, то набор пуст).
3public Iterator iterator()
Итератор позволяет циклически проходить через коллекцию, получать или перемещать элементы.
4public List records()
Получить список записей для данного раздела.

Настройки конфигурации

Параметры конфигурации для основных параметров конфигурации API клиента клиента перечислены ниже –

S.NoНастройки и описание
1bootstrap.servers
Начальный список брокеров.
2group.id
Назначает отдельного потребителя в группу.
3enable.auto.commit
Включите автоматическую фиксацию для смещений, если значение равно true, иначе не зафиксировано.
4auto.commit.interval.ms
Верните, как часто обновленные смещения записываются в ZooKeeper.
5session.timeout.ms
Указывает, сколько миллисекунд Кафка будет ждать, пока ZooKeeper ответит на запрос (чтение или запись), прежде чем отказаться от и продолжать потреблять сообщения.

Приложение SimpleConsumer

Шаги приложения производителя остаются неизменными здесь. Сначала запустите брокера ZooKeeper и Kafka. Затем создайте приложение SimpleConsumer с помощью класса Java с именем SimpleCon-sumer.java и введите следующий код.

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class SimpleConsumer {
   public static void main(String[] args) throws Exception {
      if(args.length == 0){
         System.out.println("Enter topic name");
         return;
      }
      //Kafka consumer configuration settings
      String topicName = args[0].toString();
      Properties props = new Properties();
      
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", "test");
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      props.put("value.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer
         <String, String>(props);
      
      //Kafka Consumer subscribes list of topics here.
      consumer.subscribe(Arrays.asList(topicName))
      
      //print the topic name
      System.out.println("Subscribed to topic " + topicName);
      int i = 0;
      
      while (true) {
         ConsumerRecords<String, String> records = con-sumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
         
         // print the offset,key and value for the consumer records.
         System.out.printf("offset = %d, key = %s, value = %s\n", 
            record.offset(), record.key(), record.value());
      }
   }
}

Компиляция – приложение может быть скомпилировано с помощью следующей команды.

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

Выполнение – приложение может быть выполнено с помощью следующей команды

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleConsumer <topic-name>

Вход – откройте CLI производителя и отправьте несколько сообщений в тему. Вы можете поставить ввод smple как «Привет, Потребитель».

Вывод – следующим будет вывод.

Subscribed to topic Hello-Kafka
offset = 3, key = null, value = Hello Consumer

Leave a Reply

Your email address will not be published. Required fields are marked *