Kafka Ingestion

As more and more organizations have come to rely on streaming data to provide real-time insights, users need to able to discover, integrate, and ingest all available data from the sources that produce it, as fast as it's being produced, in any format, and at any quality. Infoworks now supports ingestion of streaming data into our customers' data lakes. The data is stored in either ORC or Parquet format, and is kept updated via incremental data synchronization from Kafka.

Prerequisites and Considerations

  • A process that writes incremental data to Kafka cluster or to MapR Streams must be available.
  • Spark must be set up on their cluster.
  • Initial Bulk load for the target table must be completed.
  • Order of the columns in stream remains the same as it was during initial bulk load.
  • Each topic in Kafka/MapR Streams is considered as a table in Infoworks.
  • Infoworks supports Kafka records in delimited format and JSON format (if the source type is JSON).

Running Streaming Job

Before starting ingestion from Kafka, you must configure a table for streaming ingestion as follows:

  • Navigate to the table configuration page and click the Streaming Configuration tab and select Enable Streaming.

Select the streaming engine and the configurations as follows:

  • Infoworks supports Spark as streaming engine.
  • Spark can run job in Yarn and Standalone mode, select according to your cluster configuration.
  • Select Spark streaming duration based on the use case.
  • Enter Kafka/MapR Streams topic names/patterns associated to the corresponding table.

Topic name can be,

  • single, for example, Topic1.
  • a list, for example, Topic1,Topic2,Topic3.
  • a pattern, for example, Topic(.)*. In this case, all topics that match the given pattern is subscribed. For topic regex, it is assumed that all topics that match the entered pattern have records with the same schema. And, any new topic added to a message queue, which matches with the regex is subscribed automatically.
  • Save the configuration.
  • Navigate to the streaming tab to view all the tabs configured for streaming.
  • Click the start icon to start the Streaming job.
  • Check the streaming status for job status.
  • Click the stop icon to stop the Streaming job.
  • Click the Click to view logs option to view the job summary.
  • Records fetched by the Streaming job can be queried by the Hive view with name as Realtime_<table name>. For example, for the TB_UPDATES table, realtime view name will be Realtime_TB_UPDATES.

Configurations

  • READ_MESSAGES_FROM: The value is Kafka or MapR Streams. By default, the value is set as Kafka.
  • spark_home: Spark installation path (Mandatory).
  • zookeeper_hosts: Zookeeper host (Mandatory).
  • zookeeper_port: Zookeeper port (Mandatory).
  • BOOTSTRAP_SERVERS_WITH_PORT: Kafka Bootstrap servers, comma-separated list of values. For example, 127.0.0.1:9092,127.0.0.2:9092. (Mandatory)
  • USE_RECEIVER_BASED_APPROACH: To use kafka low level APIs. The default value is false.
  • consumer_force_from_start: To force start kafka consumer from start. The default value is false.
  • number_of_receivers: Number of Kafka receivers. The default value is 1.
  • SPARK_STREAM_WINDOW_DURATION_IN_SECONDS: Spark stream window duration in seconds. (Source or table level)
  • SPARK_STREAM_SLIDING_INTERVAL_IN_SECONDS: Spark stream sliding interval in seconds (source or table level).
  • spark_master: Spark master in case of standalone or mesos.
  • SPARK_STREAM_YARN_QUEUE: Yarn queue name to submit spark streaming job.
  • SPARK_STREAM_DRIVER_MEMORY: Spark stream job driver memory. For example, 1g, 2g. The default value is 1g.
  • SPARK_STREAM_EXECUTOR_MEMORY: Spark stream job executors memory. For example, 1g, 2g. The default value is 1g.
  • SPARK_STREAM_DYNAMIC_ALLCOATION: Spark dynamic resource allocation. The default value is false.
  • SPARK_STREAM_EXECUTOR_CORES: Number of cores in each executor. The default value is 1.
  • KAFKA_GROUP_ID: Kafka consumer group ID. If the ID is not specified, Infoworks creates an ID.
  • KAFKA_KEY_DESERIALIZER: Custom key deserializer. You can extend org.apache.kafka.common.serialization.Deserializer to write their custom deserialize, and configure the same class to be used by Infoworks. The default value is StringDeserializer.
  • KAFKA_VALUE_DESERIALIZER: Custom value deserializer. You can extend org.apache.kafka.common.serialization.Deserializer to write their custom deserialize, and configure the same class to be used by Infoworks.The default value is StringDeserializer.
  • SASL_MECHANISM: SSL Mechanism used to connect to kafka.
  • SECURITY_PROTOCOL: Security protocol used to connect to kafka.
  • SSL_TRUST_STORE_LOCATION: Truststore location (truststore must be present in same location on all the nodes).
  • SSL_TRUST_STORE_PASSWORD: Truststore password.
  • KERBEROS_KEYTAB_FILE_PATH: Kerberos Ketab file path.
  • KERBEROS_PRINCIPAL: Kerberos principal to be used by the streaming ingestion job.
  • DRIVER_JAVA_OPTIONS: Additional java options to be passed to Spark stream driver.
  • EXECUTOR_JAVA_OPTIONS: Additional java options to be passed to Spark stream executors.
Type to search, ESC to discard
Type to search, ESC to discard
Type to search, ESC to discard