KafkaConsumer.poll() を使用して Kafka コンシューマ エントリ ポイントをインストゥルメント化するには、コンシューマがカスタムインターセプタ定義のループ内のメッセージを読み取るメソッドを特定します。各メッセージのビジネストランザクションを開始して終了するために、反復子の次のメソッドがインストゥルメント化されます。反復するメッセージに使用される反復子はさまざまなものが考えられますが、次のタイプの反復子のみがサポートされています。
kafka.consumer.ConsumerIterator
org.apache.kafka.clients.consumer.ConsumerRecords$Concatenated Iterable$1
- Kafkaからメッセージを処理するループのクラスとメソッドを識別します。
たとえば、以下のループを使用して Kafka からメッセージをポーリングして処理するクラス
MyConsumer があるとします。
ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
pollMessages(records);
private void pollMessages(ConsumerRecords<String, String>records) throws Exception {
//AppDynamics instrumentation gets applied here
for (ConsumerRecord<String, String> record : records) {
//Processing of the records
System.out.println(record.value());
}
}
この場合、次をインターセプトします。
- クラス:
MyConsumer
- メソッド:
pollMessages
また、このインターセプタは、ループだけでなく個々のレコードを処理するメソッドにも適用できます。例:
ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
pollMessages(records);
private void pollMessages(ConsumerRecords<String, String>records) throws Exception {
//AppDynamics instrumentation gets applied here
for (ConsumerRecord<String, String> record : records) {
processRecord(record)
}
}
- 任意のテキストエディタを使用して、次のパスで custom-interceptors.xml という名前のファイルを作成および編集します。例:
<agent_home>/<version_number>/conf
例:
/usr/home/appdynamics/appagent/ver4.3.1.0/conf/custom-interceptors.xml
- 以下のXMLをcustom-interceptors.xmlにコピーします:
l
<custom-interceptors>
<custom-interceptor>
<interceptor-class-name>com.singularity.KafkaMarkerMethodInterceptor</interceptor-class-name>
<match-class type="matches-class">
<name filter-type="equals">my-fully-qualified-class-name</name>
</match-class>
<match-method>
<name>my-method-name</name>
</match-method>
</custom-interceptor>
</custom-interceptors>
- クラス名の値をコンシューマクラスの名前に設定します。
たとえば、
MyConsumer クラスを指定するには、次のようにします。
<match-class type="matches-class">
<name filter-type="equals">com.mycompany.mypackage.MyConsumer</name>
</match-class>
- メソッド名の値をメッセージ処理ループメソッドの名前に設定します。
たとえば、
pollMessages メソッドを指定するには、次のようにします。
<match-method>
<name>pollMessages</name>
</match-method>
Javaエージェントは構成の更新を読み取ると、コンシューマアクティビティとアップストリームKafkaキューを検出します。アプリケーションフローマップには、Kafkaキューからデータを受信するティアが表示されます。