connectors
Creates, updates, deletes, gets or lists a connectors resource.
Overview
| Name | connectors |
| Type | Resource |
| Id | confluent.connect.connectors |
Fields
The following fields are returned by SELECT queries:
- read_connectv1_connector
- list_connectv1_connectors
Connector.
| Name | Datatype | Description |
|---|---|---|
name | string | Name of the connector |
config | object | Configuration parameters for the connector. These configurations are the minimum set of key-value pairs (KVP) which can be used to define how the connector connects Kafka to the external system. Some of these KVPs are common to all the connectors, such as connection parameters to Kafka, connector metadata, etc. The list of common connector configurations is as follows - cloud.environment - cloud.provider - connector.class - kafka.api.key - kafka.api.secret - kafka.endpoint - kafka.region - name A specific connector such as GcsSink would have additional parameters such as gcs.bucket.name, flush.size, etc. |
tasks | array | List of active tasks generated by the connector |
type | string | Type of connector, sink or source (sink, source) |
Connector.
| Name | Datatype | Description |
|---|---|---|
connectv1_connector | string |
Methods
The following methods are available for this resource:
| Name | Accessible by | Required Params | Optional Params | Description |
|---|---|---|---|---|
read_connectv1_connector | select | connector_name, environment_id, kafka_cluster_id | Get information about the connector. | |
list_connectv1_connectors | select | environment_id, kafka_cluster_id | Retrieve a list of "names" of the active connectors. You can then make a read request for a specific connector by name. | |
create_connectv1_connector | insert | environment_id, kafka_cluster_id | Create a new connector. Returns the new connector information if successful. | |
delete_connectv1_connector | delete | connector_name, environment_id, kafka_cluster_id | Delete a connector. Halts all tasks and deletes the connector configuration. | |
list_connectv1_connectors_with_expansions | exec | environment_id, kafka_cluster_id | expand | Retrieve an object with the queried expansions of all connectors. Without expand query parameter, this list connector’s endpoint will return a list of only the connector names. |
pause_connectv1_connector | exec | connector_name, environment_id, kafka_cluster_id | Pause the connector and its tasks. Stops message processing until the connector is resumed. This call is asynchronous and the tasks will not transition to PAUSED state at the same time. | |
resume_connectv1_connector | exec | connector_name, environment_id, kafka_cluster_id | Resume a paused connector or do nothing if the connector is not paused. This call is asynchronous and the tasks will not transition to RUNNING state at the same time. | |
restart_connectv1_connector | exec | connector_name, environment_id, kafka_cluster_id | Restart the connector and its tasks. Stops message processing until the connector and tasks are restart. This call is asynchronous and the connector will not transition to another state at the same time. |
Parameters
Parameters can be passed in the WHERE clause of a query. Check the Methods section to see which parameters are required or optional for each operation.
| Name | Datatype | Description |
|---|---|---|
connector_name | string | The unique name of the connector. |
environment_id | string | The unique identifier of the environment this resource belongs to. |
kafka_cluster_id | string | The unique identifier for the Kafka cluster. |
expand | string | - id : Returns metadata of each connector such as id and id type. - info : Returns metadata of each connector such as the configuration, task information, and type of connector. - status : Returns additional state information of each connector including their status and tasks. |
SELECT examples
- read_connectv1_connector
- list_connectv1_connectors
Get information about the connector.
SELECT
name,
config,
tasks,
type
FROM confluent.connect.connectors
WHERE connector_name = '{{ connector_name }}' -- required
AND environment_id = '{{ environment_id }}' -- required
AND kafka_cluster_id = '{{ kafka_cluster_id }}' -- required
;
Retrieve a list of "names" of the active connectors. You can then make a read request for a specific connector by name.
SELECT
connectv1_connector
FROM confluent.connect.connectors
WHERE environment_id = '{{ environment_id }}' -- required
AND kafka_cluster_id = '{{ kafka_cluster_id }}' -- required
;
INSERT examples
- create_connectv1_connector
- Manifest
Create a new connector. Returns the new connector information if successful.
INSERT INTO confluent.connect.connectors (
name,
config,
offsets,
environment_id,
kafka_cluster_id
)
SELECT
'{{ name }}',
'{{ config }}',
'{{ offsets }}',
'{{ environment_id }}',
'{{ kafka_cluster_id }}'
RETURNING
name,
config,
offsets,
tasks,
type
;
# Description fields are for documentation purposes
- name: connectors
props:
- name: environment_id
value: "{{ environment_id }}"
description: Required parameter for the connectors resource.
- name: kafka_cluster_id
value: "{{ kafka_cluster_id }}"
description: Required parameter for the connectors resource.
- name: name
value: "{{ name }}"
description: |
Name of the connector to create.
- name: config
description: |
Configuration parameters for the connector. All values should be strings.
value:
connector.class: "{{ connector.class }}"
name: "{{ name }}"
kafka.api.key: "{{ kafka.api.key }}"
kafka.api.secret: "{{ kafka.api.secret }}"
confluent.connector.type: "{{ confluent.connector.type }}"
confluent.custom.plugin.id: "{{ confluent.custom.plugin.id }}"
confluent.custom.connection.endpoints: "{{ confluent.custom.connection.endpoints }}"
confluent.custom.schema.registry.auto: "{{ confluent.custom.schema.registry.auto }}"
confluent.custom.connect.plugin.runtime: "{{ confluent.custom.connect.plugin.runtime }}"
confluent.custom.connect.java.version: "{{ confluent.custom.connect.java.version }}"
- name: offsets
description: |
Array of offsets which are categorised into partitions.
value:
- partition: "{{ partition }}"
offset: "{{ offset }}"
DELETE examples
- delete_connectv1_connector
Delete a connector. Halts all tasks and deletes the connector configuration.
DELETE FROM confluent.connect.connectors
WHERE connector_name = '{{ connector_name }}' --required
AND environment_id = '{{ environment_id }}' --required
AND kafka_cluster_id = '{{ kafka_cluster_id }}' --required
;
Lifecycle Methods
- list_connectv1_connectors_with_expansions
- pause_connectv1_connector
- resume_connectv1_connector
- restart_connectv1_connector
Retrieve an object with the queried expansions of all connectors. Without expand query parameter, this list connector’s endpoint will return a list of only the connector names.
EXEC confluent.connect.connectors.list_connectv1_connectors_with_expansions
@environment_id='{{ environment_id }}' --required,
@kafka_cluster_id='{{ kafka_cluster_id }}' --required,
@expand='{{ expand }}'
;
Pause the connector and its tasks. Stops message processing until the connector is resumed. This call is asynchronous and the tasks will not transition to PAUSED state at the same time.
EXEC confluent.connect.connectors.pause_connectv1_connector
@connector_name='{{ connector_name }}' --required,
@environment_id='{{ environment_id }}' --required,
@kafka_cluster_id='{{ kafka_cluster_id }}' --required
;
Resume a paused connector or do nothing if the connector is not paused. This call is asynchronous and the tasks will not transition to RUNNING state at the same time.
EXEC confluent.connect.connectors.resume_connectv1_connector
@connector_name='{{ connector_name }}' --required,
@environment_id='{{ environment_id }}' --required,
@kafka_cluster_id='{{ kafka_cluster_id }}' --required
;
Restart the connector and its tasks. Stops message processing until the connector and tasks are restart. This call is asynchronous and the connector will not transition to another state at the same time.
EXEC confluent.connect.connectors.restart_connectv1_connector
@connector_name='{{ connector_name }}' --required,
@environment_id='{{ environment_id }}' --required,
@kafka_cluster_id='{{ kafka_cluster_id }}' --required
;