Elasticsearch8.x java api 批量插入上万条数据

1.maven依赖
		<dependency>
            <groupId>co.elastic.clients</groupId>
            <artifactId>elasticsearch-java</artifactId>
            <version>8.12.0</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.17.0</version>
        </dependency>
2.批量插入
	@Autowired
    ElasticsearchClient esClient;

	//单批次插入数量
    public static int BATCH_SIZE = 1000;

    //批量插入时最大大小,当此值够大时上面的数量才有空间,单位:byte 10485760 = 10M, 1073741824 = 1G
    public static int BYTE_SIZE = 10485760;

    //并发请求数量
    public static int REQUEST_SIZE = 10;
    
	/**
     * 批量插入
     *
     * @param list
     * @throws IOException
     */
    public boolean bulk2ES(List<EmpiPro> list) {
        try {
            //通过数据总量和单批次插入数量计算并发请求数
            log.info("并发请求数量:{},单批次插入数量:{},最大请求大小:{}", REQUEST_SIZE, BATCH_SIZE, BYTE_SIZE);
            BulkListener<String> listener = getBulkIngester();
            BulkIngester<String> bulkIngester = BulkIngester.of(b -> b
                    .client(esClient)
                    .maxOperations(BATCH_SIZE)
                    .maxSize(BYTE_SIZE)
                    .maxConcurrentRequests(REQUEST_SIZE)
                    .flushInterval(60, TimeUnit.SECONDS)
                    .listener(listener)
            );

            for (int i = 0; i < list.size(); i++) {
                EmpiPro empiPro = list.get(i);
                IndexOperation<EmpiPro> indexOperation = new IndexOperation.Builder<EmpiPro>()
                        // 索引
                        .index(EMPI_INDEX_NAME)
                        // 文档id
                        .id(empiPro.getEmpi())
                        // 文档内容
                        .document(empiPro)
                        .build();
                BulkOperation bulkOperation = new BulkOperation.Builder()
                        .index(indexOperation)
                        .build();
                bulkIngester.add(bulkOperation);
            }
            bulkIngester.close();
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * bulk监听器
     * @return
     * @throws Exception
     */
    public BulkListener<String> getBulkIngester() throws Exception {
        BulkListener<String> listener = new BulkListener<String>() {

            /**
             *
             * @param executionId 此请求的id
             * @param request 将发送的批量请求
             * @param contexts 数据集
             */
            @Override
            public void beforeBulk(long executionId, BulkRequest request, List<String> contexts) {
                log.info("【beforeBulk】批次[{}】 携带 【{}】 请求数量", executionId, contexts.size());
            }

            /**
             * 批量请求之后调用
             * @param executionId 此请求的id
             * @param request 将发送的批量请求
             * @param contexts 数据集
             * @param response 返回值
             */
            @Override
            public void afterBulk(long executionId, BulkRequest request, List<String> contexts, BulkResponse response) {
                log.info("【afterBulk】批次[{}】 提交数据量【{}】 提交结果【{}】", executionId, contexts.size(), response.errors() ? "失败" : "成功");
            }

            /**
             * 当批量请求无法发送到Elasticsearch时调用
             * @param executionId 此请求的id
             * @param request 将发送的批量请求
             * @param contexts 数据集
             * @param failure 异常信息
             */
            @Override
            public void afterBulk(long executionId, BulkRequest request, List<String> contexts, Throwable failure) {
                log.error("Bulk request " + executionId + " failed", failure);
            }
        };
        return listener;
    }
补充es的配置类:
@Configuration
public class EsConfig {

    @Value("${spring.data.elasticsearch.host}")
    private String host;

    @Value("${spring.data.elasticsearch.scheme}")
    private String scheme;

    @Value("${spring.data.elasticsearch.port}")
    private int port;

    @Value("${spring.data.elasticsearch.user}")
    private String userName;

    @Value("${spring.data.elasticsearch.pwd}")
    private String password;

    @Bean
    public ElasticsearchClient elasticsearchClient(){
        //ElasticsearchClient client = new ElasticsearchClient(null);
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        //设置账号密码
        credentialsProvider.setCredentials(
                AuthScope.ANY, new UsernamePasswordCredentials(userName, password));

        RestClient restClient = RestClient.builder(new HttpHost(host, port,scheme))
                // 异步httpclient配置
                .setHttpClientConfigCallback( httpClientBuilder -> {
                    // 账号密码登录
                    httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                    // httpclient保活策略
                    httpClientBuilder.setKeepAliveStrategy(((response, context) -> Duration.ofMinutes(5).toMillis()));
                    return httpClientBuilder;
                }).build();

        ElasticsearchTransport transport = new RestClientTransport(restClient,new JacksonJsonpMapper());
        // And create the API client
        return new ElasticsearchClient(transport);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/882642.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【计算机网络 - 基础问题】每日 3 题(十九)

✍个人博客&#xff1a;Pandaconda-CSDN博客 &#x1f4e3;专栏地址&#xff1a;http://t.csdnimg.cn/fYaBd &#x1f4da;专栏简介&#xff1a;在这个专栏中&#xff0c;我将会分享 C 面试中常见的面试题给大家~ ❤️如果有收获的话&#xff0c;欢迎点赞&#x1f44d;收藏&…

【Python】curl命令、Api POST导入cURL、python直接使用cURL

文章目录 一、浏览器复制cURL二、API POST直接使用cURL创建接口三、python直接使用cURL构建网络请求四、curl命令详解语法选项实例 cURL是一种命令行工具&#xff0c;常用于通过各种协议&#xff08;如HTTP、HTTPS、FTP等&#xff09;传输数据。它的名字来源于"Client URL…

Python爬虫之requests模块(一)

Python爬虫之requests模块&#xff08;一&#xff09; 学完urllib之后对爬虫应该有一定的了解了&#xff0c;随后就来学习鼎鼎有名的requests模块吧。 一、requests简介。 1、什么是request模块&#xff1f; requests其实就是py原生的一个基于网络请求的模块&#xff0c;模拟…

「漏洞复现」灵当CRM marketing/index.php SQL注入漏洞

0x01 免责声明 请勿利用文章内的相关技术从事非法测试&#xff0c;由于传播、利用此文所提供的信息而造成的任何直接或者间接的后果及损失&#xff0c;均由使用者本人负责&#xff0c;作者不为此承担任何责任。工具来自网络&#xff0c;安全性自测&#xff0c;如有侵权请联系删…

python爬虫初体验(一)

文章目录 1. 什么是爬虫&#xff1f;2. 为什么选择 Python&#xff1f;3. 爬虫小案例3.1 安装python3.2 安装依赖3.3 requests请求设置3.4 完整代码 4. 总结 1. 什么是爬虫&#xff1f; 爬虫&#xff08;Web Scraping&#xff09;是一种从网站自动提取数据的技术。简单来说&am…

c++调用python函数进行传参计算和返回

目录 前言&#xff1a;配置环境&#xff1a;基础夯实&#xff1a;1. 初始化Python解释器2. 导入Python模块3. 获取函数对象4. 调用Python函数5. 处理返回值6. 错误处理7. 资源管理8. 编译和链接9. 线程安全性10. 进一步学习 效果展示&#xff1a;实现功能&#xff1a;操作步骤&…

CentOS上使用Mosquitto实现Mqtt主题消息发布和订阅mqtt主题消息连同时间戳记录到文件

场景 CentOS上使用rpm离线安装Mosquitto(Linux上Mqtt协议调试工具)附资源下载&#xff1a; CentOS上使用rpm离线安装Mosquitto(Linux上Mqtt协议调试工具)附资源下载-CSDN博客 上面介绍了mosquitto的离线安装。 如果业务场景中需要订阅某mqtt主题的消息并将收到消息的时间以…

Android轻量级RTSP服务使用场景分析和设计探讨

技术背景 好多开发者&#xff0c;对我们Android平台轻量级RTSP服务模块有些陌生&#xff0c;不知道这个模块具体适用于怎样的场景&#xff0c;有什么优缺点&#xff0c;实际上&#xff0c;我们的Android平台轻量级RTSP服务模块更适用于内网环境下、对并发要求不高的场景&#…

Activiti7《第九式:破气式》——流畅驱动工作流进程。面试题大全

冲冲冲&#xff01;开干 这篇文章将分为九个篇章&#xff0c;带你逐步掌握工作流的核心知识。“破气式”&#xff0c;代表着工作流中的 无形之力&#xff0c;它是贯穿整个流程的 关键驱动 不知不觉已经到了独孤九剑最后一式了&#xff0c;我相信到这里之后各位都已经出神入化…

windows桌面管理软件推荐:一键整理桌面!美化电脑桌面小助手!

windows桌面管理软件推荐来咯&#xff01;在繁忙的工作和生活中&#xff0c;一个整洁、有序的电脑桌面不仅能提升工作效率&#xff0c;还能带来愉悦的视觉体验。然而&#xff0c;随着文件的增多&#xff0c;桌面往往变得杂乱无章。幸运的是&#xff0c;市面上有许多优秀的Windo…

构建高可用和高防御力的云服务架构第五部分:PolarDB(5/5)

引言 云计算与数据库服务 云计算作为一种革命性的技术&#xff0c;已经深刻改变了信息技术行业的面貌。它通过提供按需分配的计算资源&#xff0c;使得数据存储、处理和分析变得更加灵活和高效。在云计算的众多服务中&#xff0c;数据库服务扮演着核心角色。数据库服务不仅负…

​地质图制图规范大全资料分享

我们在《2024年最新测绘地理信息规范在线查看下载》一文整理460个测绘地理信息相关规范的在线查看链接。 现在我们又整理了地质图制图规范大全分享给大家&#xff0c;你可以在文末查看该文档的领取方法。 地质图制图规范大全 这些地质图制图规范来自地质科学数据出版系统&am…

Rustrover2024.2 正式发布:个人非商用免费,泰裤辣

如果这个世界本身 已经足够荒唐 那究竟什么才能算是疯狂 爱情就是这样 一旦错过了 就会有另一个人代替 我们知道 jetbrains 在今年的早些时候正式为 rust 语言发布了专用的 IDE &#xff0c;也就是 rustrover。如今 rustrover 也正式跻身为 jetbrains IDE 系列的一员猛将。…

network request to https://registry.npmjs.org/xxx failed, reason: connect ETIM

目录&#xff1a; 1、问题描述2、解决方案3、npm镜像仓库替换 1、问题描述 npm install 时&#xff0c;报错&#xff1a;npm ERR! network request to https://registry.npmjs.org/postcss-pxtorem failed, reason: connect ETIMEDOU npm ERR! code ETIMEDOUT npm ERR! errno…

COMTRADE 录波文件 | 可视化工具 | 电能质量查看软件

COMTRADE 录波文件 | 可视化工具 | 电能质量查看软件 主要功能介绍 支持 IEEE Std C37.111-1991/1999/2013 规范。读取 ASCII 或二进制 COMTRADE 文件。查看来自 COMTRADE 配置文件的模拟和数字通道列表。将图表导出为 SVG、BMP、JPEG 和 PNG 图形格式。将显示的观察结果以 C…

携手长江存储,构建高性能分布式存储

近年来&#xff0c;《金融科技&#xff08;FinTech&#xff09;发展规划&#xff08;2022-2025 年&#xff09;》《关于银行业保险业数字化转型的指导意见》《金融标准化“十四五”发展规划》等金融监管政策陆续出台&#xff0c;金融机构对于数据基础设施的升级部署&#xff0c…

工业建模设计软件概览与SOLIDWORKS深度解析

在当今快速发展的工业领域&#xff0c;高效的建模设计软件是工程师和设计师不可或缺的工具。这些软件不仅提高了设计的精确度&#xff0c;还大幅缩短了产品从概念到市场的周期。本文将为您介绍当前市场上主流的工业建模设计软件&#xff0c;并重点介绍SOLIDWORKS的优势和应用。…

10.软件工程知识详解上

软件工程概述 软件开发生命周期 软件定义时期&#xff1a;包括可行性研究和详细需求分析过程&#xff0c;任务是确定软件开发工程必须完成的总目标&#xff0c;具体可分成问题定义、可行性研究、需求分析等。软件开发时期&#xff1a;就是软件的设计与实现&#xff0c;可分成…

汽车总线之----FlexRay总线

Introduction 随着汽车智能化发展&#xff0c;车辆开发的ECU数量不断增加&#xff0c;人们对汽车系统的各个性能方面提出了更高的需求&#xff0c;比如更多的数据交互&#xff0c;更高的传输带宽等。现如今人们广泛接受电子功能来提高驾驶安全性&#xff0c;像ABS防抱死系统&a…

git push出错Push cannot contain secrets

报错原因&#xff1a; 因为你的代码里面包含了github token明文信息&#xff0c;github担心你的token会泄漏&#xff0c;所以就不允许你推送这些内容。 解决办法&#xff1a; 需要先把代码里面的github token信息删除掉&#xff0c;并且删掉之前的历史提交&#xff0c;只要包…