Skip to main content

Pulsar

Integration Details

The Datahub Pulsar source plugin extracts topic and schema metadata from an Apache Pulsar instance and ingest the information into Datahub. The plugin uses the Pulsar admin Rest API interface to interact with the Pulsar instance. The following APIs are used in order to:

The data is extracted on tenant and namespace basis, topics with corresponding schema (if available) are ingested as Dataset into Datahub. Some additional values like schema description, schema_version, schema_type and partitioned are included as DatasetProperties.

Concept Mapping

This ingestion source maps the following Source System Concepts to DataHub Concepts:

Source ConceptDataHub ConceptNotes
pulsarData Platform
Pulsar TopicDatasetsubType: topic
Pulsar SchemaSchemaFieldMaps to the fields defined within the Avro or JSON schema definition.

Metadata Ingestion Quickstart

For context on getting started with ingestion, check out our metadata ingestion guide. Incubating

Important Capabilities

CapabilityStatusNotes
DomainsSupported via the domain config field
Platform InstanceEnabled by default

PulsarSource(config: datahub.ingestion.source_config.pulsar.PulsarSourceConfig, ctx: datahub.ingestion.api.common.PipelineContext)

NOTE: Always use TLS encryption in a production environment and use variable substitution for sensitive information (e.g. ${CLIENT_ID} and ${CLIENT_SECRET}).

Prerequisites

In order to ingest metadata from Apache Pulsar, you will need:

  • Access to a Pulsar Instance, if authentication is enabled a valid access token.
  • Pulsar version >= 2.7.0

NOTE: A superUser role is required for listing all existing tenants within a Pulsar instance.

CLI based Ingestion

Install the Plugin

pip install 'acryl-datahub[pulsar]'

Starter Recipe

Check out the following recipe to get started with ingestion! See below for full configuration options.

For general pointers on writing and running a recipe, see our main recipe guide.

source:
type: "pulsar"
config:
env: "TEST"
platform_instance: "local"
## Pulsar client connection config ##
web_service_url: "https://localhost:8443"
verify_ssl: "/opt/certs/ca.cert.pem"
# Issuer url for auth document, for example "http://localhost:8083/realms/pulsar"
issuer_url: <issuer_url>
client_id: ${CLIENT_ID}
client_secret: ${CLIENT_SECRET}
# Tenant list to scrape
tenants:
- tenant_1
- tenant_2
# Topic filter pattern
topic_patterns:
allow:
- ".*sales.*"

sink:
# sink configs

Config Details

Note that a . is used to denote nested fields in the YAML recipe.

Field [Required]TypeDescriptionDefaultNotes
client_idstringThe application's client ID
client_secretstringThe application's client secret
exclude_individual_partitionsbooleanExtract each individual partitioned topic. e.g. when turned off a topic with 100 partitions will result in 100 Datasets.True
issuer_urlstringThe complete URL for a Custom Authorization Server. Mandatory for OAuth based authentication.
oid_configobjectPlaceholder for OpenId discovery document
platform_instancestringThe instance of the platform that all assets produced by this recipe belong to
tenantsarray(string)
timeoutintegerTimout setting, how long to wait for the Pulsar rest api to send data before giving up5
tokenstringThe access token for the application. Mandatory for token based authentication.
verify_sslUnionType (See notes for variants)Either a boolean, in which case it controls whether we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use.TrueOne of boolean, string
web_service_urlstringThe web URL for the cluster.http://localhost:8080
envstringThe environment that all assets produced by this connector belong toPROD
domainmap(str,AllowDenyPattern)A class to store allow deny regexes
domain.key.allowarray(string)
domain.key.denyarray(string)
domain.key.ignoreCasebooleanWhether to ignore case sensitivity during pattern matching.True
namespace_patternsAllowDenyPatternList of regex patterns for namespaces to include/exclude from ingestion. By default the functions namespace is denied.{'allow': ['.*'], 'deny': ['public/functions'], 'ignoreCase': True}
namespace_patterns.allowarray(string)
namespace_patterns.denyarray(string)
namespace_patterns.ignoreCasebooleanWhether to ignore case sensitivity during pattern matching.True
tenant_patternsAllowDenyPatternList of regex patterns for tenants to include/exclude from ingestion. By default all tenants are allowed.{'allow': ['.*'], 'deny': ['pulsar'], 'ignoreCase': True}
tenant_patterns.allowarray(string)
tenant_patterns.denyarray(string)
tenant_patterns.ignoreCasebooleanWhether to ignore case sensitivity during pattern matching.True
topic_patternsAllowDenyPatternList of regex patterns for topics to include/exclude from ingestion. By default the Pulsar system topics are denied.{'allow': ['.*'], 'deny': ['/__.*$'], 'ignoreCase': True}
topic_patterns.allowarray(string)
topic_patterns.denyarray(string)
topic_patterns.ignoreCasebooleanWhether to ignore case sensitivity during pattern matching.True
stateful_ingestionStatefulStaleMetadataRemovalConfigsee Stateful Ingestion
stateful_ingestion.enabledbooleanThe type of the ingestion state provider registered with datahub.False
stateful_ingestion.ignore_new_statebooleanIf set to True, ignores the current checkpoint state.False
stateful_ingestion.ignore_old_statebooleanIf set to True, ignores the previous checkpoint state.False
stateful_ingestion.remove_stale_metadatabooleanSoft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled.True

Code Coordinates

  • Class Name: datahub.ingestion.source.pulsar.PulsarSource
  • Browse on GitHub

Questions

If you've got any questions on configuring ingestion for Pulsar, feel free to ping us on our Slack.