Перейти к основному содержимому
Перейти к основному содержимому

Соединение Apache NiFi с ClickHouse

Community Maintained

Apache NiFi — это программное обеспечение с открытым исходным кодом для управления потоками данных, разработанное для автоматизации передачи данных между программными системами. Оно позволяет создавать ETL-конвейеры данных и поставляется с более чем 300 процессорами данных. В этом пошаговом руководстве показано, как подключить Apache NiFi к ClickHouse как к источнику, так и к получателю, и загрузить тестовый набор данных.

1. Соберите свои данные для подключения

To connect to ClickHouse with HTTP(S) you need this information:

  • The HOST and PORT: typically, the port is 8443 when using TLS or 8123 when not using TLS.

  • The DATABASE NAME: out of the box, there is a database named default, use the name of the database that you want to connect to.

  • The USERNAME and PASSWORD: out of the box, the username is default. Use the username appropriate for your use case.

The details for your ClickHouse Cloud service are available in the ClickHouse Cloud console. Select the service that you will connect to and click Connect:

ClickHouse Cloud service connect button

Choose HTTPS, and the details are available in an example curl command.

ClickHouse Cloud HTTPS connection details

If you are using self-managed ClickHouse, the connection details are set by your ClickHouse administrator.

2. Скачайте и запустите Apache NiFi

  1. Для новой установки загрузите двоичный файл с https://nifi.apache.org/download.html и начните с выполнения команды ./bin/nifi.sh start

3. Скачайте драйвер ClickHouse JDBC

  1. Перейдите на страницу релизов драйвера ClickHouse JDBC на GitHub и найдите последнюю версию релиза JDBC
  2. В релизной версии нажмите на "Показать все xx assets" и найдите файл JAR, содержащий ключевые слова "shaded" или "all", например, clickhouse-jdbc-0.5.0-all.jar
  3. Поместите файл JAR в папку, доступную для Apache NiFi, и запомните абсолютный путь

4. Добавьте сервис контроллера DBCPConnectionPool и настройте его свойства

  1. Чтобы настроить сервис контроллера в Apache NiFi, перейдите на страницу конфигурации потока NiFi, нажав на кнопку "шестерёнка"

    Страница конфигурации потока NiFi с выделенной кнопкой шестерёнки
  2. Выберите вкладку Сервисы контроллера и добавьте новый сервис контроллера, нажав на кнопку + в правом верхнем углу

    Вкладка Сервисы контроллера с выделенной кнопкой добавления
  3. Найдите DBCPConnectionPool и нажмите кнопку "Добавить"

    Диалог выбора сервисов контроллера с выделенным DBCPConnectionPool
  4. Новый DBCPConnectionPool по умолчанию будет в недействительном состоянии. Нажмите кнопку "шестерёнка", чтобы начать конфигурацию

    Список сервисов контроллера с недействительным DBCPConnectionPool и выделенной кнопкой шестерёнки
  5. В разделе "Свойства" введите следующие значения

СвойствоЗначениеПримечание
URL соединения с базой данныхjdbc:ch:https://HOSTNAME:8443/default?ssl=trueЗамените HOSTNAME в URL соединения соответственно
Имя класса драйвера базы данныхcom.clickhouse.jdbc.ClickHouseDriver
Место расположения драйвера базы данных/etc/nifi/nifi-X.XX.X/lib/clickhouse-jdbc-0.X.X-patchXX-shaded.jarАбсолютный путь к JAR-файлу драйвера ClickHouse JDBC
Пользователь базы данныхdefaultИмя пользователя ClickHouse
ПарольpasswordПароль ClickHouse
  1. В разделе Настройки измените имя сервиса контроллера на "ClickHouse JDBC" для удобства

    Диалог конфигурации DBCPConnectionPool с заполненными свойствами
  2. Активируйте сервис контроллера DBCPConnectionPool, нажав кнопку "молния", а затем кнопку "Включить"

    Список сервисов контроллера с выделенной кнопкой молнии

    Диалог подтверждения включения сервиса контроллера
  3. Проверьте вкладку Сервисы контроллера и убедитесь, что сервис контроллера включен

    Список сервисов контроллера с включенным сервисом ClickHouse JDBC

5. Чтение из таблицы с помощью процессора ExecuteSQL

  1. Добавьте процессор ​​ExecuteSQL, а также соответствующие upstream и downstream процессоры

    Канва NiFi с процессором ExecuteSQL в рабочем процессе
  2. В разделе "Свойства" процессора ​​ExecuteSQL введите следующие значения

    СвойствоЗначениеПримечание
    Служба подключения к базе данныхClickHouse JDBCВыберите службу контроллера, настроенную для ClickHouse
    SQL запросSELECT * FROM system.metricsВведите ваш запрос здесь
  3. Запустите процессор ​​ExecuteSQL

    Конфигурация процессора ExecuteSQL с заполненными свойствами
  4. Чтобы убедиться, что запрос был успешно обработан, проверьте один из FlowFile в выходной очереди

    Диалог списка очереди с готовыми для проверки FlowFile
  5. Переключитесь в режим "отформатировано", чтобы просмотреть результат выходного FlowFile

    Просмотр содержимого FlowFile, �показывающий результаты запроса в отформатированном виде

6. Запись в таблицу с помощью процессов MergeRecord и PutDatabaseRecord

  1. Чтобы записать несколько строк в одном вставке, сначала нужно объединить несколько записей в одну. Это можно сделать с помощью процессора MergeRecord

  2. В разделе "Свойства" процессора MergeRecord введите следующие значения

    СвойствоЗначениеПримечание
    Читатель записейJSONTreeReaderВыберите соответствующий читатель записей
    Писатель записейJSONReadSetWriterВыберите соответствующий писатель записей
    Минимальное число записей1000Увеличьте это значение, чтобы минимальное количество строк было объединено в одну запись. По умолчанию 1 строка
    Максимальное число записей10000Увеличьте это значение больше, чем "Минимальное число записей". По умолчанию 1,000 строк
  3. Чтобы подтвердить, что несколько записей объединяются в одну, проверьте входные и выходные данные процессора MergeRecord. Обратите внимание, что выходные данные — это массив нескольких входных записей

    Входные данные

    Входные данные процессора MergeRecord, показывающие отдельные записи

    Выходные данные

    Выходные данные процессора MergeRecord, показывающие объединённый массив записей
  4. В разделе "Свойства" процессора PutDatabaseRecord введите следующие значения

    СвойствоЗначениеПримечание
    Читатель записейJSONTreeReaderВыберите соответствующий читатель записей
    Тип базы данныхGenericОставить по умолчанию
    Тип оператораINSERT
    Служба подключения к базе данныхClickHouse JDBCВыберите службу контроллера ClickHouse
    Имя таблицыtblВведите имя вашей таблицы здесь
    Переводить имена полейfalseУстановите на "false", чтобы имена полей, вставляемые в таблицу, соответствовали именам колонок
    Максимальный размер пакета1000Максимальное количество строк на одну вставку. Это значение не должно быть ниже значения "Минимальное число записей" в процессоре MergeRecord
  5. Чтобы убедиться, что каждая вставка содержит несколько строк, проверьте, что количество строк в таблице увеличивается как минимум на значение "Минимальное число записей", определенное в MergeRecord.

    Результаты запроса, показывающие количество строк в целевой таблице
  6. Поздравляем - вы успешно загрузили ваши данные в ClickHouse с использованием Apache NiFi!