ArangoDB Datasource for Apache Spark
ArangoDB Datasource for Apache Spark allows batch reading and writing Spark DataFrame data from and to ArangoDB, by implementing the Spark Data Source V2 API.
Reading tasks are parallelized according to the number of shards of the related ArangoDB collection, and the writing ones - depending on the source DataFrame partitions. The network traffic is load balanced across the available DB Coordinators.
Filter predicates and column selections are pushed down to the DB by dynamically generating AQL queries, which will fetch only the strictly required data, thus saving network and computational resources both on the Spark and the DB side.
The connector is usable from all the Spark supported client languages: Scala, Python, Java, and R.
This library works with all the non-EOLed ArangoDB versions.
Supported versions
There are several variants of this library, each one compatible with different Spark and Scala versions:
com.arangodb:arangodb-spark-datasource-2.4_2.11(Spark 2.4, Scala 2.11)com.arangodb:arangodb-spark-datasource-2.4_2.12(Spark 2.4, Scala 2.12)com.arangodb:arangodb-spark-datasource-3.1_2.12(Spark 3.1, Scala 2.12)com.arangodb:arangodb-spark-datasource-3.2_2.12(Spark 3.2, Scala 2.12)com.arangodb:arangodb-spark-datasource-3.2_2.13(Spark 3.2, Scala 2.13)
In the following sections the ${sparkVersion} and ${scalaVersion} placeholders refer to the Spark and Scala versions.
Setup
To import ArangoDB Datasource for Apache Spark in a maven project:
<dependencies>
<dependency>
<groupId>com.arangodb</groupId>
<artifactId>arangodb-spark-datasource-${sparkVersion}_${scalaVersion}</artifactId>
<version>x.y.z</version>
</dependency>
</dependencies>
To use in an external Spark cluster, submit your application with the following parameter:
--packages="com.arangodb:arangodb-spark-datasource-${sparkVersion}_${scalaVersion}:x.y.z"
General Configuration
user: db user,rootby defaultpassword: db passwordendpoints: list of Coordinators, e.g.c1:8529,c2:8529(required)acquireHostList: acquire the list of all known hosts in the cluster (trueorfalse),falseby defaultprotocol: communication protocol (vstorhttp),httpby defaultcontentType: content type for driver communication (jsonorvpack),jsonby defaulttimeout: driver connect and request timeout in ms,300000by defaultssl.enabled: ssl secured driver connection (trueorfalse),falseby defaultssl.cert.value: Base64 encoded certificatessl.cert.type: certificate type,X.509by defaultssl.cert.alias: certificate alias name,arangodbby defaultssl.algorithm: trust manager algorithm,SunX509by defaultssl.keystore.type: keystore type,jksby defaultssl.protocol: SSLContext protocol,TLSby default
SSL
To use TLS secured connections to ArangoDB, set ssl.enabled to true and either:
- provide a Base64 encoded certificate as the
ssl.cert.valueconfiguration entry and optionally setssl.*or - start the Spark driver and workers with a properly configured JVM default TrustStore
Supported deployment topologies
The connector can work with a single server, a cluster and active failover deployments of ArangoDB.
Batch Read
The connector implements support for batch reading from an ArangoDB collection.
val df: DataFrame = spark.read
.format("com.arangodb.spark")
.options(options) // Map[String, String]
.schema(schema) // StructType
.load()
The connector can read data from:
- a collection
- an AQL cursor (query specified by the user)
When reading data from a collection, the reading job is split into many Spark tasks, one for each shard in the ArangoDB source collection. The resulting Spark DataFrame has the same number of partitions as the number of shards in the ArangoDB collection, each one containing data from the respective collection shard. The reading tasks consist of AQL queries that are load balanced across all the available ArangoDB Coordinators. Each query is related to only one shard, therefore it will be executed locally in the DB-Server holding the related shard.
When reading data from an AQL cursor, the reading job cannot be partitioned or parallelized, so it will be less scalable. This mode can be used for reading data coming from different tables, i.e. resulting from an AQL traversal query.
Example
val spark: SparkSession = SparkSession.builder()
.appName("ArangoDBSparkDemo")
.master("local[*]")
.config("spark.driver.host", "127.0.0.1")
.getOrCreate()
val df: DataFrame = spark.read
.format("com.arangodb.spark")
.options(Map(
"password" -> "test",
"endpoints" -> "c1:8529,c2:8529,c3:8529",
"table" -> "users"
))
.schema(new StructType(
Array(
StructField("likes", ArrayType(StringType, containsNull = false)),
StructField("birthday", DateType, nullable = true),
StructField("gender", StringType, nullable = false),
StructField("name", StructType(
Array(
StructField("first", StringType, nullable = true),
StructField("last", StringType, nullable = false)
)
), nullable = true)
)
))
.load()
usersDF.filter(col("birthday") === "1982-12-15").show()
Read Configuration
database: database name,_systemby defaulttable: datasource ArangoDB collection name, ignored ifqueryis specified. Eithertableorqueryis required.query: custom AQL read query. If set,tablewill be ignored. Eithertableorqueryis required.batchSize: reading batch size,10000by defaultsampleSize: sample size prefetched for schema inference, only used if read schema is not provided,1000by defaultfillBlockCache: specifies whether the query should store the data it reads in the RocksDB block cache (trueorfalse),falseby defaultstream: specifies whether the query should be executed lazily,trueby defaultmode: allows setting a mode for dealing with corrupt records during parsing:PERMISSIVE: win case of a corrupted record, the malformed string is put into a field configured bycolumnNameOfCorruptRecord, and sets malformed fields to null. To keep corrupt records, a user can set a string type field namedcolumnNameOfCorruptRecordin a user-defined schema. If a schema does not have the field, it drops corrupt records during parsing. When inferring a schema, it implicitly adds thecolumnNameOfCorruptRecordfield in an output schemaDROPMALFORMED: ignores the whole corrupted recordsFAILFAST: throws an exception in case of corrupted records
columnNameOfCorruptRecord: allows renaming the new field having malformed string created by thePERMISSIVEmode
Predicate and Projection Pushdown
The connector can convert some Spark SQL filter predicates into AQL predicates and push their execution down to the data source. In this way, ArangoDB can apply the filters and return only the matching documents.
The following filter predicates (implementations of org.apache.spark.sql.sources.Filter) are pushed down:
AndOrNotEqualToEqualNullSafeIsNullIsNotNullGreaterThanGreaterThanOrEqualFilterLessThanLessThanOrEqualFilterStringStartsWithFilterStringEndsWithFilterStringContainsFilterInFilter
Furthermore, the connector will push down the subset of columns required by the Spark query, so that only the relevant documents fields will be returned.
Predicate and projection pushdowns are only performed while reading an ArangoDB collection (set by the table configuration parameter). In case of a batch read from a custom query (set by the query configuration parameter), no pushdown optimizations are performed.
Read Resiliency
The data of each partition is read using an AQL cursor. If any error occurs, the read task of the related partition will fail. Depending on the Spark configuration, the task could be retried.
Batch Write
The connector implements support for batch writing to ArangoDB collection.
import org.apache.spark.sql.DataFrame
val df: DataFrame = //...
df.write
.format("com.arangodb.spark")
.mode(SaveMode.Append)
.options(Map(
"password" -> "test",
"endpoints" -> "c1:8529,c2:8529,c3:8529",
"table" -> "users"
))
.save()
Write tasks are load balanced across the available ArangoDB Coordinators. The data saved into the ArangoDB is sharded according to the related target collection definition and is different from the Spark DataFrame partitioning.
SaveMode
On writing, org.apache.spark.sql.SaveMode is used to specify the expected behavior in case the target collection already exists.
Spark 2.4 implementation supports all save modes with the following semantics:
Append: the target collection is created, if it does not exist.Overwrite: the target collection is created, if it does not exist, otherwise it is truncated. Use it in combination with theconfirmTruncatewrite configuration parameter.ErrorIfExists: the target collection is created, if it does not exist, otherwise anAnalysisExceptionis thrown.Ignore: the target collection is created, if it does not exist, otherwise no write is performed.
Spark 3 implementations support:
Append: the target collection is created, if it does not exist.Overwrite: the target collection is created, if it does not exist, otherwise it is truncated. Use it in combination with theconfirmTruncatewrite configuration parameter.
In Spark 3 implementations, the ErrorIfExists and Ignore save modes behave the same as Append.
Use the overwriteMode write configuration parameter to specify the document overwrite behavior (if a document with the same _key already exists).
Write Configuration
table: target ArangoDB collection name (required)batchSize: writing batch size,10000by defaultbyteBatchSize: byte batch size threshold, only considered forcontentType=json,8388608by default (8 MB)table.shards: number of shards of the created collection (in case of theAppendorOverwriteSaveMode)table.type: type (documentoredge) of the created collection (in case of theAppendorOverwriteSaveMode),documentby defaultwaitForSync: specifies whether to wait until the documents have been synced to disk (trueorfalse),falseby defaultconfirmTruncate: confirms to truncate table when using theOverwriteSaveMode,falseby defaultoverwriteMode: configures the behavior in case a document with the specified_keyvalue already exists. It is only considered forAppendSaveMode.ignore(default for SaveMode other thanAppend): it will not be writtenreplace: it will be overwritten with the specified document valueupdate: it will be patched (partially updated) with the specified document value. The overwrite mode can be further controlled via thekeepNullandmergeObjectsparameter.keepNullwill also be automatically set totrue, so that null values are kept in the saved documents and not used to remove existing document fields (as for default ArangoDB upsert behavior).conflict(default for theAppendSaveMode): return a unique constraint violation error so that the insert operation fails
mergeObjects: in caseoverwriteModeis set toupdate, controls whether objects (not arrays) will be merged.true(default): objects will be mergedfalse: existing document fields will be overwritten
keepNull: in caseoverwriteModeis set toupdatetrue(default):nullvalues are saved within the document (by default)false:nullvalues are used to delete the corresponding existing attributes
retry.maxAttempts: max attempts for retrying write requests in case they are idempotent,10by defaultretry.minDelay: min delay in ms between write requests retries,0by defaultretry.maxDelay: max delay in ms between write requests retries,0by default
Write Resiliency
The data of each partition is saved in batches using the ArangoDB API for inserting multiple documents. This operation is not atomic, therefore some documents could be successfully written to the database, while others could fail. To make the job more resilient to temporary errors (i.e. connectivity problems), in case of failure the request will be retried (with another Coordinator), if the provided configuration allows idempotent requests, namely:
- the schema of the dataframe has a not nullable
_keyfield and overwriteModeis set to one of the following values:replaceignoreupdatewithkeep.null=true
These configurations are also compatible with speculative execution of tasks.
A failing batch-saving request is retried once for every Coordinator. After that, if still failing, the write task for the related partition is aborted. According to the Spark configuration, the task can be retried and rescheduled on a different executor, if the provided write configuration allows idempotent requests (as described above).
If a task ultimately fails and is aborted, the entire write job will be aborted as well. Depending on the SaveMode configuration, the following cleanup operations will be performed:
Append: no cleanup is performed and the underlying data source may require manual cleanup.DataWriteAbortExceptionis thrown.Overwrite: the target collection will be truncated.ErrorIfExists: the target collection will be dropped.Ignore: if the collection did not exist before, it will be dropped; otherwise, nothing will be done.
Write requirements
When writing to an edge collection (table.type=edge), the schema of the Dataframe being written must have:
- a non nullable string field named
_from, and - a non nullable string field named
_to
Write Limitations
- Batch writes are not performed atomically, so sometimes (i.e. in case of
overwrite.mode: conflict) several documents in the batch may be written and others may return an exception (i.e. due to a conflicting key). - Writing records with the
_keyattribute is only allowed on collections sharded by_key. - In case of the
Appendsave mode, failed jobs cannot be rolled back and the underlying data source may require manual cleanup. - Speculative execution of tasks would only work for idempotent write configurations. See Write Resiliency for more details.
Mapping Configuration
Serialization and deserialization of Spark Dataframe Row to and from JSON (or Velocypack) can be customized using the following options:
ignoreNullFields: whether to ignore null fields during serialization,falseby default (only supported in Spark 3.x)
Supported Spark data types
The following Spark SQL data types (subtypes of org.apache.spark.sql.types.Filter) are supported for reading, writing and filter pushdown.
- Numeric types:
ByteTypeShortTypeIntegerTypeLongTypeFloatTypeDoubleType
- String types:
StringType
- Boolean types:
BooleanType
- Datetime types:
TimestampTypeDateType
- Complex types:
ArrayTypeMapType(only with key typeStringType)StructType
Connect to ArangoDB Oasis
To connect to SSL secured deployments using X.509 Base64 encoded CA certificate (Oasis):
val options = Map(
"database" -> "<dbname>",
"user" -> "<username>",
"password" -> "<passwd>",
"endpoints" -> "<endpoint>:<port>",
"ssl.cert.value" -> "<base64 encoded CA certificate>",
"ssl.enabled" -> "true",
"table" -> "<table>"
)
// read
val myDF = spark.read
.format("com.arangodb.spark")
.options(options)
.load()
// write
import org.apache.spark.sql.DataFrame
val df: DataFrame = //...
df.write
.format("com.arangodb.spark")
.options(options)
.save()
Current limitations
- For
contentType=vpack, implicit deserialization casts don’t work well, i.e. reading a document having a field with a numeric value whereas the related read schema requires a string value for such a field. - Dates and timestamps fields are interpreted to be in a UTC time zone.
- In Spark 2.4, for corrupted records in batch reading, partial results are not supported. All fields other than the field configured by
columnNameOfCorruptRecordare set tonull(SPARK-26303). - In read jobs using
stream=true(default), possible AQL warnings are only logged at the end of each read task (BTS-671). - Spark SQL
DecimalTypefields are not supported in write jobs when usingcontentType=json. - Spark SQL
DecimalTypevalues are written to the database as strings. byteBatchSizeis only considered forcontentType=json(DE-226)
Demo
Check out our demo to learn more about ArangoDB Datasource for Apache Spark.