卡夫卡
最后修改时间:2023 年 8 月 23 日Kafka 插件可让您监控Kafka事件流处理、创建消费者、生产者和主题。它还允许您连接到架构注册表、创建和更新架构。
提示
在 IntelliJ IDEA 2023.2 之前,Kafka 是大数据工具插件的一部分。在IntelliJ IDEA 2023.2中,插件分为六个插件。如果您在 2023.1 或之前使用过Big Data Tools,那么将 IntelliJ IDEA 更新到 2023.2 后,您将自动安装所有这些新插件,包括Kafka。
安装卡夫卡插件
此功能依赖于Kafka插件,您需要安装并启用该插件。
按打开 IDE 设置,然后选择插件。CtrlAlt0S
打开Marketplace选项卡,找到Kafka插件,然后单击Install(如果出现提示,请重新启动 IDE)。
笔记
如果安装Kafka插件,Big Data Tools Core辅助插件也会自动安装。它有助于与大数据工具的集成 - 例如,它允许您在 Kafka 架构注册表中的 AWS Glue 身份验证与其他 AWS 连接(例如 AWS S3、AWS EMR 或 AWS Glue 监控)之间共享 MFA 和 OAuth 2.0 数据。
使用 Kafka 插件,您可以:
连接到:
如果安装并启用了 Kafka 插件,您可以使用Kafka工具窗口 ( View | Tool Windows | Kafka ) 连接到 Kafka 并使用它。或者,如果安装并启用了远程文件系统或Zeppelin插件,您还可以使用大数据工具工具窗口(视图 | 工具窗口 | 大数据工具)访问 Kafka 连接。
连接到卡夫卡
使用云提供商连接到 Kafka
打开Kafka工具窗口:查看| 工具窗口 | 卡夫卡。
在“名称”字段中,输入连接的名称以区别于其他连接。
在“配置源”列表中,选择“云”,然后在“提供程序”列表中,选择“融合”。
转到https://confluence.cloud/home。在页面右侧,单击设置菜单,选择环境,选择您的集群,然后选择客户端| 爪哇。
在复制客户端的配置片段中,提供 Kafka API 密钥并单击复制。
返回 IDE 并将复制的属性粘贴到“配置”字段中。
填写设置后,单击测试连接以确保所有配置参数正确。然后单击“确定”。
您可以选择设置:
启用连接:如果要禁用此连接,请取消选择。默认情况下,启用新创建的连接。
每个项目:选择仅为当前项目启用这些连接设置。如果您希望此连接在其他项目中可见,请取消选择它。
打开Kafka工具窗口:查看| 工具窗口 | 卡夫卡。
在“名称”字段中,输入连接的名称以区别于其他连接。
在配置源列表中,选择Cloud,然后在提供商列表中选择AWS MSK。
在Bootstrap 服务器字段中,输入 Kafka 代理的 URL 或以逗号分隔的 URL 列表。
在AWS 身份验证列表中,选择身份验证方法。
默认凭证提供者链:使用默认提供者链中的凭证。有关该链的更多信息,请参阅使用默认凭证提供程序链。
来自凭据文件的配置文件:从凭据文件中选择一个配置文件。
显式访问密钥和秘密密钥:手动输入您的凭据。
或者,您可以连接到架构注册表。
如果要在连接到 Kafka 时使用 SSH 隧道,请选择“启用隧道”,然后在SSH 配置列表中选择一项 SSH 配置或创建新配置。
填写设置后,单击测试连接以确保所有配置参数正确。然后单击“确定”。
您可以选择设置:
启用连接:如果要禁用此连接,请取消选择。默认情况下,启用新创建的连接。
每个项目:选择仅为当前项目启用这些连接设置。如果您希望此连接在其他项目中可见,请取消选择它。
连接到自定义 Kafka 服务器
打开Kafka工具窗口:查看| 工具窗口 | 卡夫卡。
在“名称”字段中,输入连接的名称以区别于其他连接。
在配置源列表中,选择自定义。
在Bootstrap 服务器字段中,输入 Kafka 代理的 URL 或以逗号分隔的 URL 列表。
在身份验证下,选择身份验证方法:
None:无需身份验证即可连接。
SASL:选择 SASL 机制(Plain、SCRAM-SHA-256、SCRAM-SHA-512 或Kerberos)并提供您的用户名和密码。
SSL协议
如果要验证代理主机名是否与代理证书中的主机名匹配,请选择验证服务器主机名。清除该复选框相当于添加该
ssl.endpoint.identification.algorithm=
属性。在信任库位置中,提供 SSL 信任库位置(属性)的路径
ssl.truststore.location
。在信任库密码中,提供 SSL 信任库密码(属性)的路径
ssl.truststore.password
。选择使用密钥库客户端身份验证并提供密钥库位置(
ssl.keystore.location
)、密钥库密码(ssl.keystore.password
) 和密钥密码(ssl.key.password
) 的值。
AWS IAM:将 AWS IAM 用于 Amazon MSK。在AWS 身份验证列表中,选择以下选项之一:
默认凭证提供者链:使用默认提供者链中的凭证。有关该链的更多信息,请参阅使用默认凭证提供程序链。
来自凭据文件的配置文件:从凭据文件中选择一个配置文件。
显式访问密钥和秘密密钥:手动输入您的凭据。
或者,您可以连接到架构注册表。
如果要在连接到 Kafka 时使用 SSH 隧道,请选择“启用隧道”,然后在SSH 配置列表中选择一项 SSH 配置或创建新配置。
填写设置后,单击测试连接以确保所有配置参数正确。然后单击“确定”。
您可以选择设置:
启用连接:如果要禁用此连接,请取消选择。默认情况下,启用新创建的连接。
每个项目:选择仅为当前项目启用这些连接设置。如果您希望此连接在其他项目中可见,请取消选择它。
使用属性连接到 Kafka
打开Kafka工具窗口:查看| 工具窗口 | 卡夫卡。
在“名称”字段中,输入连接的名称以区别于其他连接。
在配置源列表中,选择属性。
在Bootstrap 服务器字段中,输入 Kafka 代理的 URL 或以逗号分隔的 URL 列表。
选择提供Kafka Broker配置属性的方式:
隐式:粘贴提供的配置属性。或者,您可以使用 IntelliJ IDEA 提供的代码完成和快速文档手动输入它们。
从文件:选择属性文件。
或者,您可以连接到架构注册表。
如果要在连接到 Kafka 时使用 SSH 隧道,请选择“启用隧道”,然后在SSH 配置列表中选择一项 SSH 配置或创建新配置。
填写设置后,单击测试连接以确保所有配置参数正确。然后单击“确定”。
您可以选择设置:
启用连接:如果要禁用此连接,请取消选择。默认情况下,启用新创建的连接。
每个项目:选择仅为当前项目启用这些连接设置。如果您希望此连接在其他项目中可见,请取消选择它。
建立与 Kafka 服务器的连接后,带有此连接的新选项卡将出现在Kafka工具窗口中。您可以使用它来生成和使用数据、创建和删除主题。如果您连接到架构注册表,您还可以查看、创建和更新架构。
单击Kafka工具窗口的任意选项卡可重命名、删除、禁用或刷新连接,或修改其设置。
所有集群主题都显示在主题部分。您可以单击以仅显示喜爱的主题或显示或隐藏内部主题。单击任何主题即可获取有关该主题的更多详细信息,例如有关分区、配置和架构的信息。
创建主题
打开Kafka工具窗口:查看| 工具窗口 | 卡夫卡。
选择主题并单击(或按)。AltInsert
为新主题命名,指定分区数和复制因子,然后单击“确定”。
笔记
如果您有许多主题,并且希望其中一些主题显示在列表顶部,则可以将它们标记为收藏夹:右键单击主题并选择添加到收藏夹。
从主题中删除记录
打开Kafka工具窗口:查看| 工具窗口 | 卡夫卡。
在主题下,右键单击主题并选择清除主题(或单击其左侧)。单击“确定”确认删除。
生产和消费数据
产生数据
打开Kafka工具窗口:查看| 工具窗口 | 卡夫卡。
选择 Kafka 连接并单击(创建生产者)。
这将在新的编辑器选项卡中打开制作人。
在主题列表中,选择要写入消息的主题。
在“键”和“值”下,选择消息键和值。用于根据所选类型生成随机值。
如果您已连接到架构注册表,则可以选择架构注册表来根据所选架构检查发送的数据。
在headers下,提供任何自定义标头。如果您有 JSON 或 CSV 格式的文件,则可以将它们粘贴到此部分。
在Flow下,您可以控制记录流程:
如果您想同时发送多条记录,请在一次记录中输入一个数字。
如果您希望随机生成记录数据,请选择“生成随机密钥”和“生成随机值” 。
设置发送记录之间的时间间隔(以毫秒为单位)。
如果您希望生产者在达到指定数量的记录或经过指定时间后停止发送消息,请提供停止条件。
在Options下,提供其他选项:
Partition:指定一个主题分区,记录必须发送到该分区。如果未指定,则使用默认逻辑:生产者采用密钥的哈希值对分区数取模。
Compression:为生产者生成的数据选择压缩类型:None、Gzip、Snappy、Lz4或Zstd。
幂等性:如果要确保每条消息的一份副本准确写入流中,请选择此选项。
Acks:如果您希望领导者将记录写入其本地日志并响应而不等待所有追随者的完全确认,请选择领导者。选择全部,让领导者等待完整的同步副本集确认记录。为生产者保留None,以免等待服务器的任何确认。
单击“生成”。
然后,您可以单击“数据”选项卡中的任何记录以显示其详细信息。您也可以单击启用统计功能。
消耗数据
打开Kafka工具窗口:查看| 工具窗口 | 卡夫卡。
选择 Kafka 连接并单击(创建消费者)。
这将在新的编辑器选项卡中打开消费者。
在主题列表中,选择您要订阅的主题。
在“键”和“值”下,选择您要使用的记录的键和值的数据类型。
使用范围和过滤器来缩小消耗数据的范围:
在“开始于”列表中,选择要使用数据的时间段或偏移量。选择从头开始以获取主题中的所有记录。
在“限制”列表中,选择何时停止接收数据,例如,当主题中的记录达到一定数量时。
使用Filter按键、值或标头中的子字符串过滤记录。
在分区下输入分区 ID 或以逗号分隔的 ID 列表,以仅从特定分区获取记录。
单击开始消费。
然后,您可以单击“数据”选项卡中的任何记录以显示其详细信息。您也可以单击启用统计功能。
笔记
要快速创建生产者和消费者,请为这些操作分配快捷方式。按,输入创建 Kafka Producer或创建 Kafka Consumer,然后按。CtrlShift0AAltEnter
保存生产者或消费者预设
如果您经常使用相同的键、值、标头或其他参数生成或使用数据,则可以将它们保存为预设。然后,您可以重复使用预设来快速创建生产者或消费者。
在Kafka工具窗口中,单击(创建生产者) 或(创建消费者)。
指定所需的参数,然后在生产者或消费者创建表单顶部单击(保存预设)。
参数保存为预设,可在“预设”选项卡中找到。单击预设以应用它。
使用架构注册表
生产者和消费者可以使用模式来验证并确保其记录键和值的一致性。Kafka 插件与 Schema Registry 集成,并支持 Avro、Protobuf 和 JSON 模式。它使您能够:
连接到架构注册表
创建、更新、删除和克隆架构
以原始格式或树视图预览模式
比较架构版本
删除架构版本
连接到架构注册表
如果您使用Confluence,则可以将 Broker 和 Schema 注册表属性粘贴到“配置”字段中。
否则,展开架构注册表部分并选择一个提供程序:Confluence或Glue。
汇合胶水URL:输入架构注册表 URL。
配置来源:选择提供连接参数的方式:
自定义:选择身份验证方法并提供凭据。
如果您想使用与 Kafka Broker 不同的 SSL 设置,请清除使用代理 SSL 设置复选框并提供信任库的路径。
属性:粘贴提供的配置属性。或者,您可以使用 IntelliJ IDEA 提供的代码完成和快速文档手动输入属性。
区域:选择架构注册表区域。
AWS 身份验证:选择身份验证方法:
默认凭证提供者链:使用默认提供者链中的凭证。有关该链的更多信息,请参阅使用默认凭证提供程序链。
来自凭据文件的配置文件:从凭据文件中选择一个配置文件。
显式访问密钥和秘密密钥:手动输入您的凭据。
注册表名称:输入要连接的架构注册表的名称,或单击以从列表中选择它。
填写设置后,单击测试连接以确保所有配置参数正确。然后单击“确定”。
创建架构
打开Kafka工具窗口:查看| 工具窗口 | 卡夫卡。
选择架构注册表并单击(或按)。AltInsert
在“格式”列表中,选择架构格式:Avro、Protobuf 或 JSON。
在策略列表中,选择命名策略,并根据所选策略设置名称后缀或选择主题。或者,选择自定义名称并输入任意名称。
提示
如果您使用 AWS Glue Schema Registry,则只能设置架构名称。
提示
基于现有架构创建架构的一种快速方法是克隆它:右键单击架构并选择克隆架构或单击其左侧。
您可以在树视图和原始视图中预览架构。
笔记
如果您有许多模式,并且希望其中一些显示在列表顶部,则可以将它们标记为收藏夹:右键单击模式并选择添加到收藏夹。
比较架构版本
连接到架构注册表时,在架构注册表下选择一个架构。
切换到原始视图并单击“比较”。如果架构具有多个版本,则该按钮可用。
删除架构版本
如果某个架构有多个版本,您可以删除特定版本。架构注册表支持两种类型的删除:软删除(版本删除后架构元数据和 ID 未从注册表中删除)和硬删除(删除所有元数据,包括架构 ID)。选择的能力取决于您是使用 Confluence 还是 AWS Glue Schema Registry:
在Confluence SchemaRegistry中,默认使用软删除。您可以通过选中永久删除复选框来选择使用硬删除。
AWS Glue Schema Registry 始终使用硬删除。
在架构注册表下,选择一个架构。
在其右侧,单击并选择“删除版本”。
提示
要完全删除架构及其所有版本,请右键单击该架构并选择删除架构。
感谢您的反馈意见!