Recentemente me deparei com o problema de ter que processar um arquivão (muito comum no dia a dia) e fazer algumas operações. Logo em seguida, salvar o resultado em um indice do elasticsearch. Nesses casos eu recorro sempre ao Apache Spark e pensei será que não existe algum cliente para colocar isso de maneira otimizada no elastic? E como sempre o google me respondeu temos sim e se chama elasticsearch-hadoop. Olhando a Doc, fui testar na prática e em alguns casos tive uns comportamentos inesperados, principalmente utilizando Dataframes.
O Spark tem uma API muito boa para lidar com leitura e escrita de Dataframes, ou seja quero usar dessa forma! Googlando um pouquinho mais acabei encontrando na documentação do Databricks a forma “certa” de ser fazer. Hoje vou compartilhar com vocês este aprendizado, vamos lá!
Quando trabalho no universo JVM recomendo utilizar o sdkman para gerenciar as diversas versões de java, scala, sbt e por aê vai.
# instalando o sdkman
curl -s "https://get.sdkman.io" | bash
# instalando versões da jdk, scala, sbt
sdk install 8.0.275.hs-adpt
sdk use java 8.0.275.hs-adpt
sdk install scala 2.12.16
sdk use scala 2.12.16
sdk install sbt 1.7.1
sdk use sbt 1.7.1
Com o ambiente pronto, agora vamos criar o nosso projeto, e eu recomendo utilizar o comando sbt new ALGUMA_TEMPLATE_QUALQUER.
sbt new scala/scala-seed.g8
Agora que temos a estrutura do projeto criada, vamos compilar para saber se tudo esta ok!
sbt compile
sbt run
Se tudo deu certo iremos visualizar o seguinte output:
$ sbt run
[info] welcome to sbt 1.7.1 (Oracle Corporation Java 17.0.2)
[info] loading global plugins from /Users/MEU_USUARIO/.sbt/1.0/plugins
[info] loading project definition from /Users/MEU_USUARIO/elasticsearch-spark/project
[info] loading settings for project root from build.sbt ...
[info] set current project to lala (in build file:/Users/MEU_USUARIO/elasticsearch-spark/)
[info] running example.Hello
hello
Agora vamos criar os nossos exemplos em scala.
mkdir project
mkdir -p src/main/scala/br/com/rtancman/
touch src/main/scala/br/com/rtancman/ElasticsearchSpark.scala
touch project/plugins.sbt
touch docker-compose.yml
Edite o arquivo src/main/scala/br/com/rtancman/ElasticsearchSpark.scala
com o conteudo abaixo:
package br.com.rtancman
import org.apache.spark.sql.{SparkSession, Row}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.functions._
import scala.collection.JavaConversions._
object ElasticsearchSpark {
def main(args: Array[String]) {
val spark = SparkSession
.builder()
.appName("elasticsearch-spark-example-1")
.master("local[*]")
.config("es.nodes", "localhost")
.config("es.port", "9200")
.getOrCreate()
import spark.implicits._
val schemaMovie = StructType(
Array(
StructField("title", StringType,true),
StructField("type", StringType,true)
)
)
val rowMovie= Seq(Row("Matrix", "movie"), Row("John Wick", "movie"))
val df = spark.createDataFrame(rowMovie, schemaMovie)
df.write
.format("org.elasticsearch.spark.sql")
.option("es.resource", "elasticsearch-spark-small-movies")
.option("es.write.operation", "upsert")
.option("es.index.auto.create", "true")
.option("es.nodes.wan.only", "true")
.option("es.mapping.id", "id")
.option("es.spark.dataframe.write.null", "true")
.save()
spark.close()
}
}
Vamos incluir as dependências. Para isto vamos precisar editar 3 arquivos project/Dependencies.scala
, build.sbt
e project/plugins.sbt
com o conteúdo abaixo:
Arquivo project/Dependencies.scala
:
import sbt._
object Dependencies {
lazy val scalaTest = "org.scalatest" %% "scalatest" % "3.2.11"
lazy val spark = "org.apache.spark" %% "spark-core" % "3.2.2"
lazy val sparkSql = "org.apache.spark" %% "spark-sql" % "3.2.2"
lazy val elasticSpark =
"org.elasticsearch" % "elasticsearch-spark-30_2.12" % "8.3.3"
}
object ExcludeDependencies {
lazy val sparkCore2_11 = "org.apache.spark" % "spark-core_2.11"
lazy val sparkSQL2_11 = "org.apache.spark" % "spark-sql_2.11"
lazy val sparkYarn2_11 = "org.apache.spark" % "spark-yarn_2.11"
lazy val sparkStreaming2_11 = "org.apache.spark" % "spark-streaming_2.11"
}
Arquivo build.sbt
:
import Dependencies._
import ExcludeDependencies._
ThisBuild / scalaVersion := "2.12.16"
ThisBuild / version := "1.0.0"
ThisBuild / organization := "br.com.rtancman"
ThisBuild / organizationName := "rtancman"
ThisBuild / assembly / assemblyMergeStrategy := {
case PathList("META-INF", _*) => MergeStrategy.discard
case "application.conf" => MergeStrategy.concat
case "reference.conf" => MergeStrategy.concat
case _ => MergeStrategy.first
}
lazy val root = (project in file("."))
.settings(
name := "rtancman-elastic-spark-examples",
libraryDependencies ++= Seq(
scalaTest % Test,
elasticSpark,
spark,
sparkSql
),
excludeDependencies ++= Seq(
sparkCore2_11,
sparkSQL2_11,
sparkYarn2_11,
sparkStreaming2_11
)
)
Arquivo project/plugins.sbt
:
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "1.2.0")
Vamos testar se tudo esta ok rodando sbt compile
. Devemos ter o seguinte resultado:
[success] Total time: 4 s, completed Aug 17, 2022, 4:44:51 PM
Para executar este programa precisamos de um cluster spark e do elasticsearch. Começando com o setup do spark:
wget https://archive.apache.org/dist/spark/spark-3.2.2/spark-3.2.2-bin-hadoop2.7.tgz
tar x spark-3.2.2-bin-hadoop2.7.tgz
export SPARK_HOME=`pwd`/spark-3.2.2-bin-hadoop2.7
export PATH=$PATH:$SPARK_HOME:$SPARK_HOME/bin:$SPARK_HOME/sbin
# test
spark-shell
Para o elasticsearch, vamos subir com o docker-compose. Edite o arquivo docker-compose.yml
com o conteúdo abaixo:
version: '3'
services:
elasticsearch:
image: us.gcr.io/jusbrasil-155317/jusbrasil/scooby-tenx-elastic-iteration-test-base:8.3.3
container_name: elasticsearch_spark
ports:
- "9200:9200"
- "9300:9300"
environment:
- discovery.type=single-node
- xpack.security.enabled=false
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
networks:
- elasticsearch_spark
kibana:
image: docker.elastic.co/kibana/kibana:8.3.3
ports:
- "5601:5601"
environment:
ELASTICSEARCH_URL: http://elasticsearch_spark:9200
ELASTICSEARCH_HOSTS: http://elasticsearch_spark:9200
networks:
- elasticsearch_spark
volumes:
es_data:
driver: local
networks:
elasticsearch_spark:
driver: bridge
Para rodar este spark job, vamos subir o elastic, compilar o nosso projeto gerando um .jar e rodar o comando spark-submit.
docker-compose up -d
sbt assembly
spark-submit \
--class br.com.rtancman.ElasticsearchSpark \
target/scala-2.12/rtancman-elastic-spark-examples-assembly-1.0.0.jar
Nosso docker-compose tem uma imagem do kibana e podemos acessar a url http://localhost:5601/app/dev_tools para realizar queries no elasticsearch.
GET /_cat/indices
GET /elasticsearch-spark-small-movies/_search
Se tudo correu conforme o esperado o seu resultado deve ser:
{
"took": 1,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 2,
"relation": "eq"
},
"max_score": 1,
"hits": [
{
"_index": "elasticsearch-spark-small-movies",
"_id": "2",
"_score": 1,
"_source": {
"id": "2",
"title": "John Wick",
"type": "movie"
}
},
{
"_index": "elasticsearch-spark-small-movies",
"_id": "1",
"_score": 1,
"_source": {
"id": "1",
"title": "Matrix",
"type": "movie"
}
}
]
}
}
Spark e Elasticsearch são duas ferramentas poderosas e neste artigo busquei detalhar como podemos trabalhar com elas em conjunto para gerar valor no nosso dia a dia.
Referências: