Index Common Crawl with Elasticsearch without a running cluster

Common Crawl is an amazing project

I’m always surprised when people I talk to are unfamiliar with Common Crawl, it’s one of the most surprising and empowering projects in a world where governments and corporations seem hell bent on taking power away from us.

The projects is host to 8 year of web data grouped in 3 main categories: raw web page, metadata extracts and text extracts with derivative work such as a file index as well as a link graph with ranking for top domains and every page.

There are a ton of examples on how to process WARC files, most of them center around Python and Java with a good mixture of Scala, Clojure, Rust and surprisingly Ruby. Given the petabytes available you usually need a hefty Hadoop cluster to process one month’s 2.5 billion or 198 TiB data dump consisting of WARC, WAT, WET files as well as a parquet index with limited metadata about every website indexed.

warc, wet, wat, WHAT?

There are 3 main kind of files produced each month all of them in the Web ARChive format, there are plenty of libraries that can extract data from it in all the languages mentioned above. Evidently the data is compressed as well as broken down into segments, a path to one of the warc files looks like

s3://commoncrawl/crawl-data/CC-MAIN-2019-04/segments/1547583659944.3/warc/CC-MAIN-20190118070121-20190118092121-00160.warc.gz

with the date, segment (partition) visible. You can access it for free if you have a AWS account, they are stored in us-east-1.

file type stores
WARC everything, crawl info, response headers, html metadata, content
WAT carwl info, response headers, html metadata
WET text content only with all html tags removed

Since actual examples are worth a thousand words here is an example from BBC with content of every kind.

Elasticsearch is a complex beast

I rarely encounter someone unaware of Elasticsearch, even if they don’t work in the tech industry the latest controversy made waves outside our bubble. Here is how Elastic (the company) describes Elasticsearch (the product)

Elasticsearch is a distributed, RESTful search and analytics engine capable of solving a growing number of use cases. As the heart of the Elastic Stack, it centrally stores your data so you can discover the expected and uncover the unexpected.

That is a fancy way of saying we built a distributed system with all the nuts and bolts one might expect on top of the lucene index library so you don’t have to.

If at this point you’re not familiar with Elasticsearch then don’t bother reading further, close the tab or go back. Hacky version of data loading that involve carefully tricking ES into indexing in the same place where spark executors run is not for the fainth of heart. In my previous conversations with Elastic they have been reluctant to support this method so if you really need that you’re a bit out of luck.

Elasticsearch and Apache Spark

Apache Spark is the prime candidate to process the huge amount of data available in any of the Common Crawl monthly crawls. Elastic even provides easy tooling allowing you to lift any dataframe into an available Elasticsearch cluster.


import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext._

import org.elasticsearch.spark.sql._

...

// spark = existing SparkSession

// case class used to define the DataFrame
case class Person(name: String, surname: String, age: Int)

//  create DataFrame
val people = spark.read.textFile("people.txt")
        .map(_.split(","))
        .map(p => Person(p(0), p(1), p(2).trim.toInt))
        .toDF()

people.saveToEs("spark/people")

If you’ve got this far and the above works out as per your requirements you then don’t read any further. You’ve probably setup an Elasticsearch cluster and spent the time to tune it for both indexing and searching. You’ve got the number of shards setup just right and basically wasted a few minutes reading things you already knew.

Locality Locality Locality

The way regular indexing from Apache Spark into Elasticsearch works is by sending requests from every executor as they process each partition of the Dataset into a separate Elasticsearch cluster. The Hadoop cluster where Spark runs and Elasticsearch cluster where documents are indexed will differ in size and setup. But you will need to tune Elastic for read heavy workloads as well as indexing and this part is not easy.

The penalty for moving data over the network is acceptable in a lot of cases however once the data gets into TB ranges time index time grows from hours to days to weeks.

One option explored here is starting an embedded Elasticsearch node per Task attempt elastic_local_node_2_0 representing the local node serving Task 2 attempt 0 and direct all indexing requests to the local node. The key is that all indices must be created with the same number of shards as Dataset partitions and every indexing request must only be directed to a single shard. This is the single most important factor that avoids going over the network into a live separate Elasticsearch cluster and keeps processing and data local. The code in the footnote still makes requests over HTTP instead of simply passing down objects but it’s a vast improvement that avoids sending youre entire dataset over the wire.

The way to successfully trick Elasticsearch is to use routing key to be the same constant string when indexing. This tells the local ES instance that the index has, say 128, partitions but we only write in one single partition. We do that with every spark dataframe partition and thus have 128 shards each located where the dataframe partition resides.

The idea is a hack, nevertheless one that can save days in indexing speed. Here is a table attempting to illustrate how one index named docs is laid on top of spark partitions.

partition/shard shard 1 shard 2 shard 3 Final Index on HDFS or S3
partition 1 3645 docs 3645 docs
partition 2 3507 docs 3507 docs
partition 3 3783 docs 3783 docs

Doing this manually turns out to be a lot harder than it needs to be for a variety of reasons. Elasticsearch embedded server is deprecated, the dependencies of spark and Elasticsearch are conflicting, Elasticsearch now starts in cluster mode broadcasting his address to his friends, none of these are useful when trying to start one server to index one set of documents then throw the instance away.

Typeclasses are like Interfaces or Protocols but different

Here is a typeclass that indexes a type Doc and returns a type Res while using a context Ctx, it is not a particularly handsome typeclass but it will do the work.


trait OfflineIndex[Ctx, Res, Doc] extends Serializable {

  def partitionContext: Ctx

  def init(shards:OfflineIndexConf)(tc: ElasticClient): Task[ElasticClient]

  def indices: Seq[String]

  def indexBatch(es: ElasticClient, c: Ctx, ts: Seq[Doc]): Res

}
  1. partitionContext allows you to create an object in the mapPartitions phase, something that is expensive to build and can then be applied over every element of type Doc
  2. init is where creation of indices for the Elasticsearch node takes place
  3. indices is the list with every index that needs to be moved to permanent storage hdfs or S3
  4. indexBatch is where all the actual document indexing happens

It’s quite obvious that this setup is harder to understand than saveToEs but it avoids havin to create an Elasticsearch cluster and optimize it for indexing as well as searching.

Simple implementation of an OfflineIndex

Here is an implementation for a simple case class with 2 string fields, it relies on the excelent library Elastic4s

// the case class
case class Country(name: String, capital: String)

object Country {
  implicit val countryIsOfflineIndex: OfflineIndex[Unit, BulkResponse, Country] = new OfflineIndex[Unit, BulkResponse, Country] {
// No context this time
    override def partitionContext: Unit = ()

    import com.sksamuel.elastic4s.http.ElasticDsl._
// create the index and raise an error if it doesn't work
    override def init(shards: OfflineConf)(tc: ElasticClient): Task[ElasticClient] = Task.deferFuture {
      tc.execute(
        createIndex("places").mappings(
          mapping("country").fields(List(
            textField("name"),
            textField("capital")
          ))))
    }.flatMap {
      case _: RequestSuccess[CreateIndexResponse] => Task(tc)
      case f: RequestFailure => Task.raiseError(new RuntimeException(s"${f.error}"))
    }
// list indices to move to permanent storage once indexing is complete
    override def indices: Seq[String] = Seq("places")
// do the actual work of indexing to elasticsearch
    override def indexBatch(es: ElasticClient, c: Unit, ts: Seq[Country]): BulkResponse = es.execute(
      bulk(ts.map(
        c => indexInto("places" / "country")
          .fields("name" -> c.name, "capital" -> c.capital)
      ))).await.result
  }
}

With OfflineIndex implemented the next step is to take any Dataset[Country] and create an Elasticsearch snapshot that can later be loaded into a running cluster.


import offline._
import monix.execution.Scheduler

val countries = spark.read.parquet("path_to_countries").as[Country]
countries.indexPartitionHttp2(20, new URI("s3://my_bucket/countriesRepo"), OfflineConf(store = true, 64))
   .cache().count // count just tell Spark to run the computation, any action will work

// Now create the snapshot
implicit val scheduler = Scheduler.io()
EsNode.renameIndicesAndSnapshot(repo.toString).runSyncUnsafe()

Load into Elasticsearch cluster

Once the above gets packaged and executed on in Apache Spark there is a fresh snapshot waiting in the s3://my_bucket/countriesRepo, the guide at Elastic on snapshots is quite extensive but the summary is:

  1. Create a snapshot

    location referes to path.repo in the elasticsearch.yml file, if it’s not relative to that Elasticsearch will not load it. This is only an issue if the path is on the local disk, S3 repo support is provided as a plugin.

    
       curl -X PUT "localhost:9200/_snapshot/my_backup" -H 'Content-Type: application/json' -d'
       {
         "type": "fs",
         "settings": {
           "location": "countries_repo_synced_from_s3",
           "compress": false
         }
       }
       '
    
  2. Sync the snapshot (optional)

    You can copy the files generated in s3://my_bucket/countriesRepo to a separate local path (eg countries_repo_synced_from_s3) relative to path.repo or leave them there if the backup location already points to s3

  3. Restore the snapshot

    With everything aligned a simple POST call with the name of the backup and the name of the snapshot restores the indices with the correct number of shards and, most important, the data.

#get the snapshot list
curl -X GET "localhost:9200/_snapshot/_all"
#restore snapshot_1
curl -X POST "localhost:9200/_snapshot/my_backup/snapshot_1/_restore"

Assuming you replaced localhost:9200 with the address of the actual cluster you should have the snapshot on S3 restored and all the indices created during the Spark job execution.

Footnote

There is an example here that makes this article sort of make sense. It allows you to create a snapshot from common crawl by passing a list of websites to be indexed. It solves all the issues that I encountered while trying to stick elasticsearch into apache spark. All you need to do is understand all the scripts, change the right parameters, get an AWS account with sufficient permissions to run an EMR cluster, find the right parameter in the right script, change it to fit your needs, run the job, watch it fail, change more things and run it again, then in no time you’ll have a snapshot saved in your S3 bucket.